You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mumrah (via GitHub)" <gi...@apache.org> on 2023/06/02 16:03:06 UTC

[GitHub] [kafka] mumrah opened a new pull request, #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

mumrah opened a new pull request, #13802:
URL: https://github.com/apache/kafka/pull/13802

   This patch adds complete test coverage for the snapshot methods in KRaftMigrationZkWriter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ahuang98 commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "ahuang98 (via GitHub)" <gi...@apache.org>.
ahuang98 commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1220820061


##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));

Review Comment:
   interesting, would have thought this would have counted as two `UpdatePartitions` since we replayed two partition change records



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,

Review Comment:
   nit: add a comment? e.g. // check we throw an exception if we cannot find the deleted topic



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -62,19 +62,19 @@
 
 public class KRaftMigrationZkWriter {
 
-    private static final String UPDATE_PRODUCER_ID = "UpdateProducerId";
-    private static final String CREATE_TOPIC = "CreateTopic";
-    private static final String UPDATE_TOPIC = "UpdateTopic";
-    private static final String DELETE_TOPIC = "DeleteTopic";
-    private static final String UPDATE_PARTITON = "UpdatePartition";
-    private static final String DELETE_PARTITION = "DeletePartition";
-    private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig";
-    private static final String DELETE_BROKER_CONFIG = "DeleteBrokerConfig";
-    private static final String UPDATE_TOPIC_CONFIG = "UpdateTopicConfig";
-    private static final String DELETE_TOPIC_CONFIG = "DeleteTopicConfig";
-    private static final String UPDATE_CLIENT_QUOTA = "UpdateClientQuota";
-    private static final String UPDATE_ACL = "UpdateAcl";
-    private static final String DELETE_ACL = "DeleteAcl";
+    static final String UPDATE_PRODUCER_ID = "UpdateProducerId";
+    static final String CREATE_TOPIC = "CreateTopic";
+    static final String UPDATE_TOPIC = "UpdateTopic";
+    static final String DELETE_TOPIC = "DeleteTopic";
+    static final String UPDATE_PARTITIONS = "UpdatePartitions";
+    static final String DELETE_PARTITION = "DeletePartition";
+    static final String UPDATE_BROKER_CONFIGS = "UpdateBrokerConfigs";
+    static final String DELETE_BROKER_CONFIGS = "DeleteBrokerConfigs";
+    static final String UPDATE_TOPIC_CONFIGS = "UpdateTopicConfigs";
+    static final String DELETE_TOPIC_CONFIGS = "DeleteTopicConfigs";
+    static final String UPDATE_CLIENT_QUOTAS = "UpdateClientQuotas";

Review Comment:
   no DELETE_CLIENT_QUOTAS?



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,
+                () -> writer.handleTopicsDelta(emptyTopicNames::get, newTopicsImage, topicsDelta, consumer));
+
+            Map<Uuid, String> topicNames = Collections.singletonMap(topicId, "spam");
+            writer.handleTopicsDelta(topicNames::get, newTopicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("DeleteTopic"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        });
+    }
+
+    @Test
+    public void testBrokerConfigDelta() {
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("spam").setValue(null));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue(null));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsDelta(image, delta, consumer);
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "b0"))
+        );
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0"))
+        );
+        assertTrue(
+            configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1"))
+        );
+    }
+
+    @Test
+    public void testBrokerConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> b0 = new HashMap<>();
+                b0.put("foo", "bar");
+                b0.put("spam", "eggs");
+                configConsumer.accept("0", b0);
+                configConsumer.accept("1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .setConfigMigrationClient(configClient)
+                .setAclMigrationClient(aclClient)
+                .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.BROKER, "3")),
+            "Broker 3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "0")),
+            "Broker 0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.BROKER, "1")),
+            "Broker 1 config is the same in image, so no write should happen");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "2")),
+            "Broker 2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_BROKER_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_BROKER_CONFIG));
+    }
+
+    @Test
+    public void testTopicConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> topic0 = new HashMap<>();
+                topic0.put("foo", "bar");
+                topic0.put("spam", "eggs");
+                configConsumer.accept("topic-0", topic0);
+                configConsumer.accept("topic-1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("topic-3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-3")),
+                "Topic topic-3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0")),
+                "Topic topic-0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1")),
+                "Topic topic-1 config is the same in image, so no write should happen");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-2")),
+                "Topic topic-2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_TOPIC_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_TOPIC_CONFIG));
+    }
+
+    @Test
+    public void testInvalidConfigSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType((byte) 99).setResourceName("resource").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        assertThrows(RuntimeException.class, () -> writer.handleConfigsSnapshot(image, consumer),
+            "Should throw due to invalid resource in image");
+    }
+
+    @Test
+    public void testProducerIdSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        {
+            // No change
+            ProducerIdsImage image = new ProducerIdsImage(1100);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(0, opCounts.size());
+        }
+
+        {
+            // KRaft differs from ZK

Review Comment:
   Should we add a test case to confirm we throw an exception (or however we want to handle it) if KRaft sees a nextProducerId lower than what ZK sees? e.g. `writer.handleProducerIdSnapshot(new ProducerIdsImage(100), consumer);`



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,
+                () -> writer.handleTopicsDelta(emptyTopicNames::get, newTopicsImage, topicsDelta, consumer));
+
+            Map<Uuid, String> topicNames = Collections.singletonMap(topicId, "spam");
+            writer.handleTopicsDelta(topicNames::get, newTopicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("DeleteTopic"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        });
+    }
+
+    @Test
+    public void testBrokerConfigDelta() {
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("spam").setValue(null));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue(null));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsDelta(image, delta, consumer);
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "b0"))
+        );
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0"))
+        );
+        assertTrue(
+            configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1"))
+        );
+    }
+
+    @Test
+    public void testBrokerConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> b0 = new HashMap<>();
+                b0.put("foo", "bar");
+                b0.put("spam", "eggs");
+                configConsumer.accept("0", b0);
+                configConsumer.accept("1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .setConfigMigrationClient(configClient)
+                .setAclMigrationClient(aclClient)
+                .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.BROKER, "3")),
+            "Broker 3 is not in the ConfigurationsImage, it should get deleted");

Review Comment:
   interesting, I didn't realize that config deltas contain all unchanged values as well, seems a bit unwieldy



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,
+                () -> writer.handleTopicsDelta(emptyTopicNames::get, newTopicsImage, topicsDelta, consumer));
+
+            Map<Uuid, String> topicNames = Collections.singletonMap(topicId, "spam");
+            writer.handleTopicsDelta(topicNames::get, newTopicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("DeleteTopic"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        });
+    }
+
+    @Test
+    public void testBrokerConfigDelta() {
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("spam").setValue(null));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue(null));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsDelta(image, delta, consumer);
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "b0"))
+        );
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0"))
+        );
+        assertTrue(
+            configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1"))
+        );

Review Comment:
   no opCounts checks for this test?



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,
+                () -> writer.handleTopicsDelta(emptyTopicNames::get, newTopicsImage, topicsDelta, consumer));
+
+            Map<Uuid, String> topicNames = Collections.singletonMap(topicId, "spam");
+            writer.handleTopicsDelta(topicNames::get, newTopicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("DeleteTopic"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        });
+    }
+
+    @Test
+    public void testBrokerConfigDelta() {
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("spam").setValue(null));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue(null));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsDelta(image, delta, consumer);
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "b0"))
+        );
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0"))
+        );
+        assertTrue(
+            configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1"))
+        );
+    }
+
+    @Test
+    public void testBrokerConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> b0 = new HashMap<>();
+                b0.put("foo", "bar");
+                b0.put("spam", "eggs");
+                configConsumer.accept("0", b0);
+                configConsumer.accept("1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .setConfigMigrationClient(configClient)
+                .setAclMigrationClient(aclClient)
+                .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.BROKER, "3")),
+            "Broker 3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "0")),
+            "Broker 0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.BROKER, "1")),
+            "Broker 1 config is the same in image, so no write should happen");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "2")),
+            "Broker 2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_BROKER_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_BROKER_CONFIG));
+    }
+
+    @Test
+    public void testTopicConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> topic0 = new HashMap<>();
+                topic0.put("foo", "bar");
+                topic0.put("spam", "eggs");
+                configConsumer.accept("topic-0", topic0);
+                configConsumer.accept("topic-1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("topic-3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-3")),
+                "Topic topic-3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0")),
+                "Topic topic-0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1")),
+                "Topic topic-1 config is the same in image, so no write should happen");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-2")),
+                "Topic topic-2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_TOPIC_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_TOPIC_CONFIG));
+    }
+
+    @Test
+    public void testInvalidConfigSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType((byte) 99).setResourceName("resource").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        assertThrows(RuntimeException.class, () -> writer.handleConfigsSnapshot(image, consumer),
+            "Should throw due to invalid resource in image");
+    }
+
+    @Test
+    public void testProducerIdSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        {
+            // No change
+            ProducerIdsImage image = new ProducerIdsImage(1100);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(0, opCounts.size());
+        }
+
+        {
+            // KRaft differs from ZK
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+
+        {
+            // "Empty" state in ZK (shouldn't really happen, but good to check)
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            migrationClient.setReadProducerId(ProducerIdsBlock.EMPTY);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+
+        {
+            // No state in ZK
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            migrationClient.setReadProducerId(null);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+    }
+
+    @Test
+    public void testProducerIdDelta() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        // No change

Review Comment:
   do we need this comment? there is a change since the delta is setting next producer id to 2000 right



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,
+                () -> writer.handleTopicsDelta(emptyTopicNames::get, newTopicsImage, topicsDelta, consumer));
+
+            Map<Uuid, String> topicNames = Collections.singletonMap(topicId, "spam");
+            writer.handleTopicsDelta(topicNames::get, newTopicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("DeleteTopic"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        });
+    }
+
+    @Test
+    public void testBrokerConfigDelta() {
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("spam").setValue(null));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue(null));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsDelta(image, delta, consumer);
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "b0"))
+        );
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0"))
+        );
+        assertTrue(
+            configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1"))
+        );
+    }
+
+    @Test
+    public void testBrokerConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> b0 = new HashMap<>();
+                b0.put("foo", "bar");
+                b0.put("spam", "eggs");
+                configConsumer.accept("0", b0);
+                configConsumer.accept("1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .setConfigMigrationClient(configClient)
+                .setAclMigrationClient(aclClient)
+                .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.BROKER, "3")),
+            "Broker 3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "0")),
+            "Broker 0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.BROKER, "1")),
+            "Broker 1 config is the same in image, so no write should happen");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "2")),
+            "Broker 2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_BROKER_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_BROKER_CONFIG));
+    }
+
+    @Test
+    public void testTopicConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> topic0 = new HashMap<>();
+                topic0.put("foo", "bar");
+                topic0.put("spam", "eggs");
+                configConsumer.accept("topic-0", topic0);
+                configConsumer.accept("topic-1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("topic-3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-3")),
+                "Topic topic-3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0")),
+                "Topic topic-0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1")),
+                "Topic topic-1 config is the same in image, so no write should happen");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-2")),
+                "Topic topic-2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_TOPIC_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_TOPIC_CONFIG));
+    }
+
+    @Test
+    public void testInvalidConfigSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType((byte) 99).setResourceName("resource").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        assertThrows(RuntimeException.class, () -> writer.handleConfigsSnapshot(image, consumer),
+            "Should throw due to invalid resource in image");
+    }
+
+    @Test
+    public void testProducerIdSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        {
+            // No change
+            ProducerIdsImage image = new ProducerIdsImage(1100);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(0, opCounts.size());
+        }
+
+        {
+            // KRaft differs from ZK
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+
+        {
+            // "Empty" state in ZK (shouldn't really happen, but good to check)
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            migrationClient.setReadProducerId(ProducerIdsBlock.EMPTY);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+
+        {
+            // No state in ZK
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            migrationClient.setReadProducerId(null);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+    }
+
+    @Test
+    public void testProducerIdDelta() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        // No change
+        ProducerIdsDelta delta = new ProducerIdsDelta(ProducerIdsImage.EMPTY);
+        delta.replay(new ProducerIdsRecord().setBrokerId(0).setBrokerEpoch(20).setNextProducerId(2000));
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleProducerIdDelta(delta, consumer);
+        assertEquals(1, opCounts.size());
+        assertEquals(2000, migrationClient.capturedProducerId);
+    }
+
+    @Test
+    public void testAclSnapshot() {
+        ResourcePattern resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + Uuid.randomUuid(), PatternType.LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + Uuid.randomUuid(), PatternType.LITERAL);
+        ResourcePattern resource3 = new ResourcePattern(ResourceType.TOPIC, "baz-" + Uuid.randomUuid(), PatternType.LITERAL);
+
+        KafkaPrincipal principal1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice");
+        KafkaPrincipal principal2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob");
+        AccessControlEntry acl1Resource1 = new AccessControlEntry(principal1.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW);
+        AccessControlEntry acl1Resource2 = new AccessControlEntry(principal2.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW);
+
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient() {
+            @Override
+            public void iterateAcls(BiConsumer<ResourcePattern, Set<AccessControlEntry>> aclConsumer) {
+                aclConsumer.accept(resource1, Collections.singleton(acl1Resource1));
+                aclConsumer.accept(resource2, Collections.singleton(acl1Resource2));
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        // Create an ACL for a new resource.
+        AclsDelta delta = new AclsDelta(AclsImage.EMPTY);
+        AccessControlEntryRecord acl1Resource3 = new AccessControlEntryRecord()
+            .setId(Uuid.randomUuid())
+            .setHost("192.168.10.1")
+            .setOperation(AclOperation.READ.code())
+            .setPrincipal("*")
+            .setPermissionType(AclPermissionType.ALLOW.code())
+            .setPatternType(resource3.patternType().code())
+            .setResourceName(resource3.name())
+            .setResourceType(resource3.resourceType().code());
+        // The equivalent ACE
+        AccessControlEntry ace1Resource3 = new AccessControlEntry("*", "192.168.10.1", AclOperation.READ, AclPermissionType.ALLOW);
+        delta.replay(acl1Resource3);
+
+        // Change an ACL for existing resource.
+        AccessControlEntryRecord acl2Resource1 = new AccessControlEntryRecord()
+            .setId(Uuid.randomUuid())
+            .setHost("192.168.15.1")
+            .setOperation(AclOperation.WRITE.code())
+            .setPrincipal(principal1.toString())
+            .setPermissionType(AclPermissionType.ALLOW.code())
+            .setPatternType(resource1.patternType().code())
+            .setResourceName(resource1.name())
+            .setResourceType(resource1.resourceType().code());
+        // The equivalent ACE
+        AccessControlEntry ace1Resource1 = new AccessControlEntry(principal1.toString(), "192.168.15.1", AclOperation.WRITE, AclPermissionType.ALLOW);
+        delta.replay(acl2Resource1);
+
+        // Do not add anything for resource 2 in the delta.
+        AclsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleAclsSnapshot(image, consumer);
+
+        assertTrue(aclClient.deletedResources.contains(resource2));
+        assertEquals(Collections.singleton(ace1Resource1), aclClient.updatedResources.get(resource1));
+        assertEquals(Collections.singleton(ace1Resource3), aclClient.updatedResources.get(resource3));
+    }
+
+
+    byte[] randomBuffer(MockRandom random, int length) {

Review Comment:
   nit: private 



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,
+                () -> writer.handleTopicsDelta(emptyTopicNames::get, newTopicsImage, topicsDelta, consumer));
+
+            Map<Uuid, String> topicNames = Collections.singletonMap(topicId, "spam");
+            writer.handleTopicsDelta(topicNames::get, newTopicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("DeleteTopic"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        });
+    }
+
+    @Test
+    public void testBrokerConfigDelta() {
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("spam").setValue(null));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue(null));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsDelta(image, delta, consumer);
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "b0"))
+        );
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0"))
+        );
+        assertTrue(
+            configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1"))
+        );
+    }
+
+    @Test
+    public void testBrokerConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> b0 = new HashMap<>();
+                b0.put("foo", "bar");
+                b0.put("spam", "eggs");
+                configConsumer.accept("0", b0);
+                configConsumer.accept("1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .setConfigMigrationClient(configClient)
+                .setAclMigrationClient(aclClient)
+                .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.BROKER, "3")),
+            "Broker 3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "0")),
+            "Broker 0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.BROKER, "1")),
+            "Broker 1 config is the same in image, so no write should happen");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "2")),
+            "Broker 2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_BROKER_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_BROKER_CONFIG));
+    }
+
+    @Test
+    public void testTopicConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> topic0 = new HashMap<>();
+                topic0.put("foo", "bar");
+                topic0.put("spam", "eggs");
+                configConsumer.accept("topic-0", topic0);
+                configConsumer.accept("topic-1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("topic-3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-3")),
+                "Topic topic-3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0")),
+                "Topic topic-0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1")),
+                "Topic topic-1 config is the same in image, so no write should happen");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-2")),
+                "Topic topic-2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_TOPIC_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_TOPIC_CONFIG));
+    }
+
+    @Test
+    public void testInvalidConfigSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType((byte) 99).setResourceName("resource").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        assertThrows(RuntimeException.class, () -> writer.handleConfigsSnapshot(image, consumer),
+            "Should throw due to invalid resource in image");
+    }
+
+    @Test
+    public void testProducerIdSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        {
+            // No change
+            ProducerIdsImage image = new ProducerIdsImage(1100);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(0, opCounts.size());
+        }
+
+        {
+            // KRaft differs from ZK
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+
+        {
+            // "Empty" state in ZK (shouldn't really happen, but good to check)
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            migrationClient.setReadProducerId(ProducerIdsBlock.EMPTY);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+
+        {
+            // No state in ZK
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            migrationClient.setReadProducerId(null);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+    }
+
+    @Test
+    public void testProducerIdDelta() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        // No change
+        ProducerIdsDelta delta = new ProducerIdsDelta(ProducerIdsImage.EMPTY);
+        delta.replay(new ProducerIdsRecord().setBrokerId(0).setBrokerEpoch(20).setNextProducerId(2000));
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleProducerIdDelta(delta, consumer);
+        assertEquals(1, opCounts.size());
+        assertEquals(2000, migrationClient.capturedProducerId);
+    }
+
+    @Test
+    public void testAclSnapshot() {
+        ResourcePattern resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + Uuid.randomUuid(), PatternType.LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + Uuid.randomUuid(), PatternType.LITERAL);
+        ResourcePattern resource3 = new ResourcePattern(ResourceType.TOPIC, "baz-" + Uuid.randomUuid(), PatternType.LITERAL);
+
+        KafkaPrincipal principal1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice");
+        KafkaPrincipal principal2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob");
+        AccessControlEntry acl1Resource1 = new AccessControlEntry(principal1.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW);
+        AccessControlEntry acl1Resource2 = new AccessControlEntry(principal2.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW);
+
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient() {
+            @Override
+            public void iterateAcls(BiConsumer<ResourcePattern, Set<AccessControlEntry>> aclConsumer) {
+                aclConsumer.accept(resource1, Collections.singleton(acl1Resource1));
+                aclConsumer.accept(resource2, Collections.singleton(acl1Resource2));
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        // Create an ACL for a new resource.
+        AclsDelta delta = new AclsDelta(AclsImage.EMPTY);
+        AccessControlEntryRecord acl1Resource3 = new AccessControlEntryRecord()
+            .setId(Uuid.randomUuid())
+            .setHost("192.168.10.1")
+            .setOperation(AclOperation.READ.code())
+            .setPrincipal("*")
+            .setPermissionType(AclPermissionType.ALLOW.code())
+            .setPatternType(resource3.patternType().code())
+            .setResourceName(resource3.name())
+            .setResourceType(resource3.resourceType().code());
+        // The equivalent ACE
+        AccessControlEntry ace1Resource3 = new AccessControlEntry("*", "192.168.10.1", AclOperation.READ, AclPermissionType.ALLOW);
+        delta.replay(acl1Resource3);
+
+        // Change an ACL for existing resource.
+        AccessControlEntryRecord acl2Resource1 = new AccessControlEntryRecord()
+            .setId(Uuid.randomUuid())
+            .setHost("192.168.15.1")
+            .setOperation(AclOperation.WRITE.code())
+            .setPrincipal(principal1.toString())
+            .setPermissionType(AclPermissionType.ALLOW.code())
+            .setPatternType(resource1.patternType().code())
+            .setResourceName(resource1.name())
+            .setResourceType(resource1.resourceType().code());
+        // The equivalent ACE
+        AccessControlEntry ace1Resource1 = new AccessControlEntry(principal1.toString(), "192.168.15.1", AclOperation.WRITE, AclPermissionType.ALLOW);
+        delta.replay(acl2Resource1);
+
+        // Do not add anything for resource 2 in the delta.
+        AclsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleAclsSnapshot(image, consumer);
+
+        assertTrue(aclClient.deletedResources.contains(resource2));
+        assertEquals(Collections.singleton(ace1Resource1), aclClient.updatedResources.get(resource1));
+        assertEquals(Collections.singleton(ace1Resource3), aclClient.updatedResources.get(resource3));
+    }
+
+
+    byte[] randomBuffer(MockRandom random, int length) {
+        byte[] buf = new byte[length];
+        random.nextBytes(buf);
+        return buf;
+    }
+
+    @Test
+    public void testClientQuotasSnapshot() {
+        List<ClientQuotaRecord.EntityData> user2Entity = Collections.singletonList(
+            new ClientQuotaRecord.EntityData()
+                .setEntityType("user").setEntityName("user2"));
+        List<ClientQuotaRecord.EntityData> ipEntity = Collections.singletonList(
+            new ClientQuotaRecord.EntityData()
+                .setEntityType("ip").setEntityName("127.0.0.1"));
+
+        MockRandom random = new MockRandom();
+        ScramCredential credential = new ScramCredential(
+            randomBuffer(random, 1024),
+            randomBuffer(random, 1024),
+            randomBuffer(random, 1024),
+            8192);
+
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateClientQuotas(ClientQuotaVisitor visitor) {
+                visitor.visitClientQuota(user2Entity, Collections.singletonMap("request_percentage", 48.48));
+                visitor.visitClientQuota(ipEntity, Collections.singletonMap("connection_creation_rate", 10.0));
+                visitor.visitScramCredential("alice", ScramMechanism.SCRAM_SHA_256, credential);
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+
+        ClientQuotasDelta clientQuotasDelta = new ClientQuotasDelta(ClientQuotasImage.EMPTY);
+        clientQuotasDelta.replay(new ClientQuotaRecord()
+            .setEntity(user2Entity)
+            .setKey("request_percentage")
+            .setValue(58.58)
+            .setRemove(false));
+        ClientQuotasImage clientQuotasImage = clientQuotasDelta.apply();
+
+        ScramDelta scramDelta = new ScramDelta(ScramImage.EMPTY);
+        scramDelta.replay(new UserScramCredentialRecord()
+            .setName("george")
+            .setMechanism(ScramMechanism.SCRAM_SHA_256.type())
+            .setSalt(credential.salt())
+            .setStoredKey(credential.storedKey())
+            .setServerKey(credential.serverKey())
+            .setIterations(credential.iterations()));
+        ScramImage scramImage = scramDelta.apply();
+
+        // Empty image, should remove things from ZK (write an empty map)
+        writer.handleClientQuotasSnapshot(ClientQuotasImage.EMPTY, ScramImage.EMPTY, consumer);
+        assertEquals(3, opCounts.remove("UpdateClientQuotas"));
+        assertEquals(0, opCounts.size());
+        assertEquals(
+            Collections.emptyMap(),
+            configClient.writtenQuotas.get(Collections.singletonMap("user", "user2")));
+        assertEquals(
+            Collections.emptyMap(),
+            configClient.writtenQuotas.get(Collections.singletonMap("user", "alice")));
+        assertEquals(
+            Collections.emptyMap(),
+            configClient.writtenQuotas.get(Collections.singletonMap("ip", "127.0.0.1")));
+
+        assertEquals(
+            Collections.emptyMap(),
+            configClient.writtenQuotas.get(Collections.singletonMap("user", "user2")));
+
+        assertEquals(
+            Collections.emptyMap(),
+            configClient.writtenQuotas.get(Collections.singletonMap("user", "alice")));

Review Comment:
   dupe, is this intentional?



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,
+                () -> writer.handleTopicsDelta(emptyTopicNames::get, newTopicsImage, topicsDelta, consumer));
+
+            Map<Uuid, String> topicNames = Collections.singletonMap(topicId, "spam");
+            writer.handleTopicsDelta(topicNames::get, newTopicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("DeleteTopic"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        });
+    }
+
+    @Test
+    public void testBrokerConfigDelta() {
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("spam").setValue(null));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue(null));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsDelta(image, delta, consumer);
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "b0"))
+        );
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0"))
+        );
+        assertTrue(
+            configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1"))
+        );
+    }
+
+    @Test
+    public void testBrokerConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> b0 = new HashMap<>();
+                b0.put("foo", "bar");
+                b0.put("spam", "eggs");
+                configConsumer.accept("0", b0);
+                configConsumer.accept("1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .setConfigMigrationClient(configClient)
+                .setAclMigrationClient(aclClient)
+                .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.BROKER, "3")),
+            "Broker 3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "0")),
+            "Broker 0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.BROKER, "1")),
+            "Broker 1 config is the same in image, so no write should happen");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "2")),
+            "Broker 2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_BROKER_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_BROKER_CONFIG));
+    }
+
+    @Test
+    public void testTopicConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> topic0 = new HashMap<>();
+                topic0.put("foo", "bar");
+                topic0.put("spam", "eggs");
+                configConsumer.accept("topic-0", topic0);
+                configConsumer.accept("topic-1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("topic-3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-3")),
+                "Topic topic-3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0")),
+                "Topic topic-0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1")),
+                "Topic topic-1 config is the same in image, so no write should happen");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-2")),
+                "Topic topic-2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_TOPIC_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_TOPIC_CONFIG));
+    }
+
+    @Test
+    public void testInvalidConfigSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType((byte) 99).setResourceName("resource").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        assertThrows(RuntimeException.class, () -> writer.handleConfigsSnapshot(image, consumer),
+            "Should throw due to invalid resource in image");
+    }
+
+    @Test
+    public void testProducerIdSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        {
+            // No change
+            ProducerIdsImage image = new ProducerIdsImage(1100);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(0, opCounts.size());
+        }
+
+        {
+            // KRaft differs from ZK
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+
+        {
+            // "Empty" state in ZK (shouldn't really happen, but good to check)
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            migrationClient.setReadProducerId(ProducerIdsBlock.EMPTY);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+
+        {
+            // No state in ZK
+            ProducerIdsImage image = new ProducerIdsImage(2000);
+            migrationClient.setReadProducerId(null);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(1, opCounts.size());
+            assertEquals(2000, migrationClient.capturedProducerId);
+        }
+    }
+
+    @Test
+    public void testProducerIdDelta() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        // No change
+        ProducerIdsDelta delta = new ProducerIdsDelta(ProducerIdsImage.EMPTY);
+        delta.replay(new ProducerIdsRecord().setBrokerId(0).setBrokerEpoch(20).setNextProducerId(2000));
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleProducerIdDelta(delta, consumer);
+        assertEquals(1, opCounts.size());
+        assertEquals(2000, migrationClient.capturedProducerId);
+    }
+
+    @Test
+    public void testAclSnapshot() {
+        ResourcePattern resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + Uuid.randomUuid(), PatternType.LITERAL);
+        ResourcePattern resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + Uuid.randomUuid(), PatternType.LITERAL);
+        ResourcePattern resource3 = new ResourcePattern(ResourceType.TOPIC, "baz-" + Uuid.randomUuid(), PatternType.LITERAL);
+
+        KafkaPrincipal principal1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice");
+        KafkaPrincipal principal2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob");
+        AccessControlEntry acl1Resource1 = new AccessControlEntry(principal1.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW);
+        AccessControlEntry acl1Resource2 = new AccessControlEntry(principal2.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW);
+
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient() {
+            @Override
+            public void iterateAcls(BiConsumer<ResourcePattern, Set<AccessControlEntry>> aclConsumer) {
+                aclConsumer.accept(resource1, Collections.singleton(acl1Resource1));
+                aclConsumer.accept(resource2, Collections.singleton(acl1Resource2));
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        // Create an ACL for a new resource.
+        AclsDelta delta = new AclsDelta(AclsImage.EMPTY);
+        AccessControlEntryRecord acl1Resource3 = new AccessControlEntryRecord()
+            .setId(Uuid.randomUuid())
+            .setHost("192.168.10.1")
+            .setOperation(AclOperation.READ.code())
+            .setPrincipal("*")
+            .setPermissionType(AclPermissionType.ALLOW.code())
+            .setPatternType(resource3.patternType().code())
+            .setResourceName(resource3.name())
+            .setResourceType(resource3.resourceType().code());
+        // The equivalent ACE
+        AccessControlEntry ace1Resource3 = new AccessControlEntry("*", "192.168.10.1", AclOperation.READ, AclPermissionType.ALLOW);
+        delta.replay(acl1Resource3);
+
+        // Change an ACL for existing resource.
+        AccessControlEntryRecord acl2Resource1 = new AccessControlEntryRecord()
+            .setId(Uuid.randomUuid())
+            .setHost("192.168.15.1")
+            .setOperation(AclOperation.WRITE.code())
+            .setPrincipal(principal1.toString())
+            .setPermissionType(AclPermissionType.ALLOW.code())
+            .setPatternType(resource1.patternType().code())
+            .setResourceName(resource1.name())
+            .setResourceType(resource1.resourceType().code());
+        // The equivalent ACE
+        AccessControlEntry ace1Resource1 = new AccessControlEntry(principal1.toString(), "192.168.15.1", AclOperation.WRITE, AclPermissionType.ALLOW);
+        delta.replay(acl2Resource1);
+
+        // Do not add anything for resource 2 in the delta.
+        AclsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleAclsSnapshot(image, consumer);
+
+        assertTrue(aclClient.deletedResources.contains(resource2));
+        assertEquals(Collections.singleton(ace1Resource1), aclClient.updatedResources.get(resource1));
+        assertEquals(Collections.singleton(ace1Resource3), aclClient.updatedResources.get(resource3));
+    }
+
+
+    byte[] randomBuffer(MockRandom random, int length) {
+        byte[] buf = new byte[length];
+        random.nextBytes(buf);
+        return buf;
+    }
+
+    @Test
+    public void testClientQuotasSnapshot() {
+        List<ClientQuotaRecord.EntityData> user2Entity = Collections.singletonList(
+            new ClientQuotaRecord.EntityData()
+                .setEntityType("user").setEntityName("user2"));
+        List<ClientQuotaRecord.EntityData> ipEntity = Collections.singletonList(
+            new ClientQuotaRecord.EntityData()
+                .setEntityType("ip").setEntityName("127.0.0.1"));
+
+        MockRandom random = new MockRandom();
+        ScramCredential credential = new ScramCredential(
+            randomBuffer(random, 1024),
+            randomBuffer(random, 1024),
+            randomBuffer(random, 1024),
+            8192);
+
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateClientQuotas(ClientQuotaVisitor visitor) {
+                visitor.visitClientQuota(user2Entity, Collections.singletonMap("request_percentage", 48.48));
+                visitor.visitClientQuota(ipEntity, Collections.singletonMap("connection_creation_rate", 10.0));
+                visitor.visitScramCredential("alice", ScramMechanism.SCRAM_SHA_256, credential);
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+
+        ClientQuotasDelta clientQuotasDelta = new ClientQuotasDelta(ClientQuotasImage.EMPTY);
+        clientQuotasDelta.replay(new ClientQuotaRecord()
+            .setEntity(user2Entity)
+            .setKey("request_percentage")
+            .setValue(58.58)
+            .setRemove(false));
+        ClientQuotasImage clientQuotasImage = clientQuotasDelta.apply();
+
+        ScramDelta scramDelta = new ScramDelta(ScramImage.EMPTY);
+        scramDelta.replay(new UserScramCredentialRecord()
+            .setName("george")
+            .setMechanism(ScramMechanism.SCRAM_SHA_256.type())
+            .setSalt(credential.salt())
+            .setStoredKey(credential.storedKey())
+            .setServerKey(credential.serverKey())
+            .setIterations(credential.iterations()));
+        ScramImage scramImage = scramDelta.apply();

Review Comment:
   nit: can we move these down to just above where they are needed? e.g. near `// with only client quota image` and `//with only scram image`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "vvcephei (via GitHub)" <gi...@apache.org>.
vvcephei commented on PR #13802:
URL: https://github.com/apache/kafka/pull/13802#issuecomment-1578973821

   retest this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1223265946


##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));

Review Comment:
   TopicMigrationClient#updateTopicPartitions takes a map of all the partition states to update for a topic, so it's just one "operation" from the client perspective (even though it's multiple ZK writes)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1219834583


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -223,16 +223,16 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat
             );
             ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
             operationConsumer.accept(
-                UPDATE_TOPIC_CONFIG,
-                "Updating Configs for Topic " + topicName + ", ID " + topicId,
+                DELETE_TOPIC_CONFIG,

Review Comment:
   Kind of. The opType is just used in logging, so there was a bug in that regard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] MINOR: Improve KRaftMigrationZkWriter test coverage [kafka]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13802:
URL: https://github.com/apache/kafka/pull/13802#issuecomment-1846498639

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1228123874


##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(2, topicClient.updatedTopics.get("spam").size());
+            assertEquals(new HashSet<>(Arrays.asList(0, 1)), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testDeleteTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new RemoveTopicRecord().setTopicId(topicId));
+            TopicsImage newTopicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            Map<Uuid, String> emptyTopicNames = Collections.emptyMap();
+            assertThrows(RuntimeException.class,
+                () -> writer.handleTopicsDelta(emptyTopicNames::get, newTopicsImage, topicsDelta, consumer));
+
+            Map<Uuid, String> topicNames = Collections.singletonMap(topicId, "spam");
+            writer.handleTopicsDelta(topicNames::get, newTopicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("DeleteTopic"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        });
+    }
+
+    @Test
+    public void testBrokerConfigDelta() {
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setConfigMigrationClient(configClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("b0").setName("spam").setValue(null));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue(null));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsDelta(image, delta, consumer);
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "b0"))
+        );
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0"))
+        );
+        assertTrue(
+            configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1"))
+        );
+    }
+
+    @Test
+    public void testBrokerConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> b0 = new HashMap<>();
+                b0.put("foo", "bar");
+                b0.put("spam", "eggs");
+                configConsumer.accept("0", b0);
+                configConsumer.accept("1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .setConfigMigrationClient(configClient)
+                .setAclMigrationClient(aclClient)
+                .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.BROKER, "3")),
+            "Broker 3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "0")),
+            "Broker 0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.BROKER, "1")),
+            "Broker 1 config is the same in image, so no write should happen");
+
+        assertEquals(
+            Collections.singletonMap("foo", "bar"),
+            configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, "2")),
+            "Broker 2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_BROKER_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_BROKER_CONFIG));
+    }
+
+    @Test
+    public void testTopicConfigSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient();
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient() {
+            @Override
+            public void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+                Map<String, String> topic0 = new HashMap<>();
+                topic0.put("foo", "bar");
+                topic0.put("spam", "eggs");
+                configConsumer.accept("topic-0", topic0);
+                configConsumer.accept("topic-1", Collections.singletonMap("foo", "bar"));
+                configConsumer.accept("topic-3", Collections.singletonMap("foo", "bar"));
+            }
+        };
+        CapturingAclMigrationClient aclClient = new CapturingAclMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .setAclMigrationClient(aclClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-0").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-1").setName("foo").setValue("bar"));
+        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName("topic-2").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleConfigsSnapshot(image, consumer);
+
+        assertTrue(configClient.deletedResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, "topic-3")),
+                "Topic topic-3 is not in the ConfigurationsImage, it should get deleted");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-0")),
+                "Topic topic-0 only has foo=bar in image, should overwrite the ZK config");
+
+        assertFalse(configClient.writtenConfigs.containsKey(new ConfigResource(ConfigResource.Type.TOPIC, "topic-1")),
+                "Topic topic-1 config is the same in image, so no write should happen");
+
+        assertEquals(
+                Collections.singletonMap("foo", "bar"),
+                configClient.writtenConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, "topic-2")),
+                "Topic topic-2 not present in ZK, should see an update");
+
+        assertEquals(2, opCounts.get(UPDATE_TOPIC_CONFIG));
+        assertEquals(1, opCounts.get(DELETE_TOPIC_CONFIG));
+    }
+
+    @Test
+    public void testInvalidConfigSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+        ConfigurationsDelta delta = new ConfigurationsDelta(ConfigurationsImage.EMPTY);
+        delta.replay(new ConfigRecord().setResourceType((byte) 99).setResourceName("resource").setName("foo").setValue("bar"));
+
+        ConfigurationsImage image = delta.apply();
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+            (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        assertThrows(RuntimeException.class, () -> writer.handleConfigsSnapshot(image, consumer),
+            "Should throw due to invalid resource in image");
+    }
+
+    @Test
+    public void testProducerIdSnapshot() {
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        migrationClient.setReadProducerId(new ProducerIdsBlock(0, 100L, 1000));
+
+        {
+            // No change
+            ProducerIdsImage image = new ProducerIdsImage(1100);
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                    (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleProducerIdSnapshot(image, consumer);
+            assertEquals(0, opCounts.size());
+        }
+
+        {
+            // KRaft differs from ZK

Review Comment:
   We don't actually verify this at this point. Once something is committed to KRaft, it's too late for validation. I'll add a case for a KRaft producer ID less than the ZK one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "vvcephei (via GitHub)" <gi...@apache.org>.
vvcephei commented on PR #13802:
URL: https://github.com/apache/kafka/pull/13802#issuecomment-1578974789

   retest this, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ahuang98 commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "ahuang98 (via GitHub)" <gi...@apache.org>.
ahuang98 commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1218675188


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -267,6 +267,9 @@ void handleTopicsDelta(
     ) {
         topicsDelta.deletedTopicIds().forEach(topicId -> {
             String name = deletedTopicNameResolver.apply(topicId);
+            if (name == null) {

Review Comment:
   curious if we hit a bug which resulted in this check as well?



##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));

Review Comment:
   nit: add comment, something like `// no-op` above



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -223,16 +223,16 @@ public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistrat
             );
             ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
             operationConsumer.accept(
-                UPDATE_TOPIC_CONFIG,
-                "Updating Configs for Topic " + topicName + ", ID " + topicId,
+                DELETE_TOPIC_CONFIG,

Review Comment:
   did the new tests catch this bug?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -475,12 +478,13 @@ void handleProducerIdSnapshot(ProducerIdsImage image, KRaftMigrationOperationCon
     void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer operationConsumer) {
         Set<ConfigResource> updatedResources = configsDelta.changes().keySet();
         updatedResources.forEach(configResource -> {
+            String opType = brokerOrTopicOpType(configResource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
             Map<String, String> props = configsImage.configMapForResource(configResource);
             if (props.isEmpty()) {
-                operationConsumer.accept("DeleteConfig", "Delete configs for " + configResource, migrationState ->
+                operationConsumer.accept(opType, "Delete configs for " + configResource, migrationState ->

Review Comment:
   do we need to set opType within the if else statements? 
   e.g. 
   ```
   if (props.isEmpty()) {
     String opType = brokerOrTopicOpType(configResource, DELETE_BROKER_CONFIG, DELETE_TOPIC_CONFIG);
     operationConsumer.accept(opType, "Delete configs for "....)
   } else {
     String opType = brokerOrTopicOpType(configResource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
     operationConsumer.accept(opType, "Update configs for "....)
     
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vvcephei commented on pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "vvcephei (via GitHub)" <gi...@apache.org>.
vvcephei commented on PR #13802:
URL: https://github.com/apache/kafka/pull/13802#issuecomment-1578973157

   I'm going to try something out for triggering builds...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1219833727


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -267,6 +267,9 @@ void handleTopicsDelta(
     ) {
         topicsDelta.deletedTopicIds().forEach(topicId -> {
             String name = deletedTopicNameResolver.apply(topicId);
+            if (name == null) {

Review Comment:
   I ran into a NPE here in my test code when I passed an empty map.
   
   In the production code, the function passed in here the getter of `topicsById` map in TopicsImage. I think it's impossible for there to be something in `deletedTopicIds` that's not also in `topicsById`. This check isn't really necessary, but the custom RuntimeException is better than an NPE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1219791584


##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##########
@@ -475,12 +478,13 @@ void handleProducerIdSnapshot(ProducerIdsImage image, KRaftMigrationOperationCon
     void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer operationConsumer) {
         Set<ConfigResource> updatedResources = configsDelta.changes().keySet();
         updatedResources.forEach(configResource -> {
+            String opType = brokerOrTopicOpType(configResource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
             Map<String, String> props = configsImage.configMapForResource(configResource);
             if (props.isEmpty()) {
-                operationConsumer.accept("DeleteConfig", "Delete configs for " + configResource, migrationState ->
+                operationConsumer.accept(opType, "Delete configs for " + configResource, migrationState ->

Review Comment:
   Yup, good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ahuang98 commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

Posted by "ahuang98 (via GitHub)" <gi...@apache.org>.
ahuang98 commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1223387331


##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##########
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
             (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
         writer.handleSnapshot(image, consumer);
         assertEquals(1, opCounts.remove("CreateTopic"));
-        assertEquals(1, opCounts.remove("UpdatePartition"));
+        assertEquals(1, opCounts.remove("UpdatePartitions"));
         assertEquals(1, opCounts.remove("UpdateTopic"));
         assertEquals(0, opCounts.size());
         assertEquals("bar", topicClient.createdTopics.get(0));
     }
+
+    @Test
+    public void testDeleteTopicFromSnapshot() {
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                visitor.visitTopic("spam", Uuid.randomUuid(), Collections.emptyMap());
+            }
+        };
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+                .setBrokersInZk(0)
+                .setTopicMigrationClient(topicClient)
+                .build();
+
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        Map<String, Integer> opCounts = new HashMap<>();
+        KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+        writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+
+        opCounts.clear();
+        topicClient.reset();
+        writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+        assertEquals(1, opCounts.remove("DeleteTopic"));
+        assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+        assertEquals(2, opCounts.remove("CreateTopic"));
+        assertEquals(0, opCounts.size());
+        assertEquals(Collections.singletonList("spam"), topicClient.deletedTopics);
+        assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+    }
+
+    @FunctionalInterface
+    interface TopicVerifier {
+        void verify(Uuid topicId, TopicsImage topicsImage, CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+    }
+
+    void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+        // Set up a topic with two partitions in ZK (via iterateTopics) and a KRaft TopicsImage, then run the given verifier
+        Uuid topicId = Uuid.randomUuid();
+        Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
+        partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, -1));
+        partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, -1));
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                Map<Integer, List<Integer>> assignments = new HashMap<>();
+                assignments.put(0, Arrays.asList(2, 3, 4));
+                assignments.put(1, Arrays.asList(3, 4, 5));
+                visitor.visitTopic("spam", topicId, assignments);
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 0)), partitionMap.get(0));
+                visitor.visitPartition(new TopicIdPartition(topicId, new TopicPartition("spam", 1)), partitionMap.get(1));
+            }
+        };
+
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0)
+            .setTopicMigrationClient(topicClient)
+            .build();
+        KRaftMigrationZkWriter writer = new KRaftMigrationZkWriter(migrationClient);
+
+        TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+        delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+        delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 0).message());
+        delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 1).message());
+        TopicsImage image = delta.apply();
+
+        verifier.verify(topicId, image, topicClient, writer);
+    }
+
+    @Test
+    public void testUpdatePartitionsFromSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(0, opCounts.size(), "No operations expected since the data is the same");
+
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsImage = topicsDelta.apply();
+
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testTopicReassignmentDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(0, opCounts.size());
+
+            assertEquals(1, topicClient.updatedTopicPartitions.get("spam").size());
+            assertEquals(Collections.singleton(0), topicClient.updatedTopicPartitions.get("spam"));
+        });
+    }
+
+    @Test
+    public void testNewTopicSnapshot() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsSnapshot(topicsImage, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewTopicDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            Uuid newTopicId = Uuid.randomUuid();
+            topicsDelta.replay(new TopicRecord().setTopicId(newTopicId).setName("new"));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(0).setReplicas(Arrays.asList(0, 1, 2)));
+            topicsDelta.replay(new PartitionRecord().setTopicId(newTopicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("CreateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testNewPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionRecord().setTopicId(topicId).setPartitionId(2).setReplicas(Arrays.asList(1, 2, 3)));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdatePartitions"));
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(0, opCounts.size());
+        });
+    }
+
+    @Test
+    public void testPartitionDelta() {
+        setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, writer) -> {
+            TopicsDelta topicsDelta = new TopicsDelta(topicsImage);
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(0).setReplicas(Arrays.asList(3, 4, 5)).setLeader(3));
+            topicsDelta.replay(new PartitionChangeRecord().setTopicId(topicId).setPartitionId(1).setReplicas(Arrays.asList(1, 2, 3)).setLeader(1));
+            topicsImage = topicsDelta.apply();
+
+            Map<String, Integer> opCounts = new HashMap<>();
+            KRaftMigrationOperationConsumer consumer = KRaftMigrationDriver.countingOperationConsumer(opCounts,
+                (logMsg, operation) -> operation.apply(ZkMigrationLeadershipState.EMPTY));
+            writer.handleTopicsDelta(__ -> "", topicsImage, topicsDelta, consumer);
+            assertEquals(1, opCounts.remove("UpdateTopic"));
+            assertEquals(1, opCounts.remove("UpdatePartitions"));

Review Comment:
   got it so it's per-topic. there's a reason for the `s` in `UpdatePartitions` 😆 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org