You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/05/31 16:52:46 UTC

[kafka] branch 3.5 updated: KAFKA-15004: Fix configuration dual-write during migration (#13767)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.5 by this push:
     new ddff9d5ecc3 KAFKA-15004: Fix configuration dual-write during migration (#13767)
ddff9d5ecc3 is described below

commit ddff9d5ecc32b6f3e8a4ad4505544c5e6284c683
Author: David Arthur <mu...@gmail.com>
AuthorDate: Sat May 27 17:20:44 2023 -0400

    KAFKA-15004: Fix configuration dual-write during migration (#13767)
    
    This patch fixes several small bugs with configuration dual-write during migration.
    
    * Topic configs are not written back to ZK while handling snapshot.
    * New broker/topic configs in KRaft that did not exist in ZK will not be written to ZK.
    * The sensitive configs are not encoded while writing them to Zookeeper.
    * Handle topic configs in ConfigMigrationClient and KRaftMigrationZkWriter#handleConfigsSnapshot
    
    Added tests to ensure we no longer have the above mentioned issues.
    
    Co-authored-by: Akhilesh Chaganti <ak...@users.noreply.github.com>
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../main/scala/kafka/zk/ZkMigrationClient.scala    |  72 ++++++------
 .../zk/migration/ZkConfigMigrationClient.scala     |  32 +++++-
 .../zk/migration/ZkTopicMigrationClient.scala      |   7 --
 .../zk/migration/ZkConfigMigrationClientTest.scala |  18 +++
 .../kafka/zk/migration/ZkMigrationClientTest.scala | 125 +++++++++++++++++++--
 .../zk/migration/ZkMigrationTestHarness.scala      |   2 +
 .../metadata/migration/ConfigMigrationClient.java  |   5 +
 .../metadata/migration/KRaftMigrationDriver.java   |   2 +-
 .../metadata/migration/KRaftMigrationZkWriter.java |  48 ++++++--
 .../metadata/migration/TopicMigrationClient.java   |   7 +-
 .../migration/CapturingConfigMigrationClient.java  |  11 ++
 11 files changed, 257 insertions(+), 72 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index f3e21e72844..e94f435d71b 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -37,7 +37,6 @@ import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}
 
 import java.{lang, util}
-import java.util.Properties
 import java.util.function.Consumer
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
@@ -145,44 +144,47 @@ class ZkMigrationClient(
     topicClient.iterateTopics(
       util.EnumSet.allOf(classOf[TopicVisitorInterest]),
       new TopicVisitor() {
-      override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
-        if (!topicBatch.isEmpty) {
-          recordConsumer.accept(topicBatch)
-          topicBatch = new util.ArrayList[ApiMessageAndVersion]()
-        }
+        override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
+          if (!topicBatch.isEmpty) {
+            recordConsumer.accept(topicBatch)
+            topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+          }
 
-        topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
-          .setName(topicName)
-          .setTopicId(topicId), 0.toShort))
-      }
+          topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+            .setName(topicName)
+            .setTopicId(topicId), 0.toShort))
 
-      override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
-        val record = new PartitionRecord()
-          .setTopicId(topicIdPartition.topicId())
-          .setPartitionId(topicIdPartition.partition())
-          .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
-          .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
-          .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
-          .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
-          .setLeader(partitionRegistration.leader)
-          .setLeaderEpoch(partitionRegistration.leaderEpoch)
-          .setPartitionEpoch(partitionRegistration.partitionEpoch)
-          .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
-        partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
-        partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
-        topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
-      }
+          // This breaks the abstraction a bit, but the topic configs belong in the topic batch
+          // when migrating topics and the logic for reading configs lives elsewhere
+          configClient.readTopicConfigs(topicName, (topicConfigs: util.Map[String, String]) => {
+            topicConfigs.forEach((key: Any, value: Any) => {
+              topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+                .setResourceType(ConfigResource.Type.TOPIC.id)
+                .setResourceName(topicName)
+                .setName(key.toString)
+                .setValue(value.toString), 0.toShort))
+            })
+          })
+        }
 
-      override def visitConfigs(topicName: String, topicProps: Properties): Unit = {
-        topicProps.forEach((key: Any, value: Any) => {
-          topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
-            .setResourceType(ConfigResource.Type.TOPIC.id)
-            .setResourceName(topicName)
-            .setName(key.toString)
-            .setValue(value.toString), 0.toShort))
-        })
+        override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
+          val record = new PartitionRecord()
+            .setTopicId(topicIdPartition.topicId())
+            .setPartitionId(topicIdPartition.partition())
+            .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
+            .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
+            .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
+            .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
+            .setLeader(partitionRegistration.leader)
+            .setLeaderEpoch(partitionRegistration.leaderEpoch)
+            .setPartitionEpoch(partitionRegistration.partitionEpoch)
+            .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
+          partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
+          partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
+          topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
+        }
       }
-    })
+    )
 
     if (!topicBatch.isEmpty) {
       recordConsumer.accept(topicBatch)
diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
index dbcc1d99b93..55fb048e686 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
@@ -23,6 +23,7 @@ import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
 import kafka.zk._
 import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
 import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
@@ -35,7 +36,7 @@ import org.apache.zookeeper.{CreateMode, KeeperException}
 
 import java.{lang, util}
 import java.util.Properties
-import java.util.function.BiConsumer
+import java.util.function.{BiConsumer, Consumer}
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
@@ -145,6 +146,28 @@ class ZkConfigMigrationClient(
     }
   }
 
+  override def iterateTopicConfigs(configConsumer: BiConsumer[String, util.Map[String, String]]): Unit = {
+    val topicEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Topic)
+    topicEntities.foreach { topic =>
+      readTopicConfigs(topic, props => configConsumer.accept(topic, props))
+    }
+  }
+
+  override def readTopicConfigs(topicName: String, configConsumer: Consumer[util.Map[String, String]]): Unit = {
+    val topicResource = fromZkEntityName(topicName)
+    val props = zkClient.getEntityConfigs(ConfigType.Topic, topicResource)
+    val decodedProps = props.asScala.map { case (key, value) =>
+      if (DynamicBrokerConfig.isPasswordConfig(key))
+        key -> passwordEncoder.decode(value).value
+      else
+        key -> value
+    }.toMap.asJava
+
+    logAndRethrow(this, s"Error in topic config consumer. Topic was $topicResource.") {
+      configConsumer.accept(decodedProps)
+    }
+  }
+
   override def writeConfigs(
     configResource: ConfigResource,
     configMap: util.Map[String, String],
@@ -159,7 +182,12 @@ class ZkConfigMigrationClient(
     val configName = toZkEntityName(configResource.name())
     if (configType.isDefined) {
       val props = new Properties()
-      configMap.forEach { case (key, value) => props.put(key, value) }
+      configMap.forEach { case (key, value) =>
+        if (DynamicBrokerConfig.isPasswordConfig(key)) {
+          props.put(key, passwordEncoder.encode(new Password(value)))
+        } else
+          props.put(key, value)
+      }
       tryWriteEntityConfig(configType.get, configName, props, create = false, state) match {
         case Some(newState) =>
           newState
diff --git a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
index a51b7c808c2..37ceef13300 100644
--- a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
@@ -48,7 +48,6 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
       throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.")
     }
     val topics = zkClient.getAllTopicsInCluster()
-    val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
     val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
     replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
       val topicAssignment = partitionAssignments.map { case (partition, assignment) =>
@@ -91,12 +90,6 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
           }
         }
       }
-      if (interests.contains(TopicVisitorInterest.CONFIGS)) {
-        val props = topicConfigs(topic)
-        logAndRethrow(this, s"Error in topic config consumer. Topic was $topic.") {
-          visitor.visitConfigs(topic, props)
-        }
-      }
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
index 7313c321fe3..b9d86be25c4 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.server.util.MockRandom
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
 
+import java.util
 import java.util.Properties
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
@@ -68,6 +69,23 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
       }
     })
 
+    // Update the sensitive config value from the config client and check that the value
+    // persisted in Zookeeper is encrypted.
+    val newProps = new util.HashMap[String, String]()
+    newProps.put(KafkaConfig.DefaultReplicationFactorProp, "2") // normal config
+    newProps.put(KafkaConfig.SslKeystorePasswordProp, NEW_SECRET) // sensitive config
+    migrationState = migrationClient.configClient().writeConfigs(
+      new ConfigResource(ConfigResource.Type.BROKER, "1"), newProps, migrationState)
+    val actualPropsInZk = zkClient.getEntityConfigs(ConfigType.Broker, "1")
+    assertEquals(2, actualPropsInZk.size())
+    actualPropsInZk.forEach { case (key, value) =>
+      if (key == KafkaConfig.SslKeystorePasswordProp) {
+        assertEquals(NEW_SECRET, encoder.decode(value.toString).value)
+      } else {
+        assertEquals(newProps.get(key), value)
+      }
+    }
+
     migrationState = migrationClient.configClient().deleteConfigs(
       new ConfigResource(ConfigResource.Type.BROKER, "1"), migrationState)
     assertEquals(0, zkClient.getEntityConfigs(ConfigType.Broker, "1").size())
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
index 7d7dae9f893..22447ee0e17 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
@@ -17,14 +17,15 @@
 package kafka.zk.migration
 
 import kafka.api.LeaderAndIsr
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
 import kafka.coordinator.transaction.ProducerIdManager
-import kafka.zk.migration.ZkMigrationTestHarness
-import org.apache.kafka.common.config.TopicConfig
+import kafka.server.{ConfigType, KafkaConfig}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, ProducerIdsRecord}
+import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, PartitionRecord, ProducerIdsRecord, TopicRecord}
 import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
+import org.apache.kafka.metadata.migration.{KRaftMigrationZkWriter, ZkMigrationLeadershipState}
 import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
@@ -252,11 +253,115 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
       .map {_.message() }
       .filter(message => MetadataRecordType.fromId(message.apiKey()).equals(MetadataRecordType.CONFIG_RECORD))
       .map { _.asInstanceOf[ConfigRecord] }
-      .toSeq
+      .map { record => record.name() -> record.value()}
+      .toMap
     assertEquals(2, configs.size)
-    assertEquals(TopicConfig.FLUSH_MS_CONFIG, configs.head.name())
-    assertEquals("60000", configs.head.value())
-    assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name())
-    assertEquals("300000", configs.last.value())
+    assertTrue(configs.contains(TopicConfig.FLUSH_MS_CONFIG))
+    assertEquals("60000", configs(TopicConfig.FLUSH_MS_CONFIG))
+    assertTrue(configs.contains(TopicConfig.RETENTION_MS_CONFIG))
+    assertEquals("300000", configs(TopicConfig.RETENTION_MS_CONFIG))
+  }
+
+  @Test
+  def testTopicAndBrokerConfigsMigrationWithSnapshots(): Unit = {
+    val kraftWriter = new KRaftMigrationZkWriter(migrationClient, (_, operation) => {
+      migrationState = operation.apply(migrationState)
+    })
+
+    // Add add some topics and broker configs and create new image.
+    val topicName = "testTopic"
+    val partition = 0
+    val tp = new TopicPartition(topicName, partition)
+    val leaderPartition = 1
+    val leaderEpoch = 100
+    val partitionEpoch = 10
+    val brokerId = "1"
+    val replicas = List(1, 2, 3).map(int2Integer).asJava
+    val topicId = Uuid.randomUuid()
+    val props = new Properties()
+    props.put(KafkaConfig.DefaultReplicationFactorProp, "1") // normal config
+    props.put(KafkaConfig.SslKeystorePasswordProp, SECRET) // sensitive config
+
+    //    // Leave Zk in an incomplete state.
+    //    zkClient.createTopicAssignment(topicName, Some(topicId), Map(tp -> Seq(1)))
+
+    val delta = new MetadataDelta(MetadataImage.EMPTY)
+    delta.replay(new TopicRecord()
+      .setTopicId(topicId)
+      .setName(topicName)
+    )
+    delta.replay(new PartitionRecord()
+      .setTopicId(topicId)
+      .setIsr(replicas)
+      .setLeader(leaderPartition)
+      .setReplicas(replicas)
+      .setAddingReplicas(List.empty.asJava)
+      .setRemovingReplicas(List.empty.asJava)
+      .setLeaderEpoch(leaderEpoch)
+      .setPartitionEpoch(partitionEpoch)
+      .setPartitionId(partition)
+      .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+    )
+    // Use same props for the broker and topic.
+    props.asScala.foreach { case (key, value) =>
+      delta.replay(new ConfigRecord()
+        .setName(key)
+        .setValue(value)
+        .setResourceName(topicName)
+        .setResourceType(ConfigResource.Type.TOPIC.id())
+      )
+      delta.replay(new ConfigRecord()
+        .setName(key)
+        .setValue(value)
+        .setResourceName(brokerId)
+        .setResourceType(ConfigResource.Type.BROKER.id())
+      )
+    }
+    val image = delta.apply(MetadataProvenance.EMPTY)
+
+    // Handle migration using the generated snapshot.
+    kraftWriter.handleSnapshot(image)
+
+    // Verify topic state.
+    val topicIdReplicaAssignment =
+      zkClient.getReplicaAssignmentAndTopicIdForTopics(Set(topicName))
+    assertEquals(1, topicIdReplicaAssignment.size)
+    topicIdReplicaAssignment.foreach { assignment =>
+      assertEquals(topicName, assignment.topic)
+      assertEquals(Some(topicId), assignment.topicId)
+      assertEquals(Map(tp -> ReplicaAssignment(replicas.asScala.map(Integer2int).toSeq)),
+        assignment.assignment)
+    }
+
+    // Verify the topic partition states.
+    val topicPartitionState = zkClient.getTopicPartitionState(tp)
+    assertTrue(topicPartitionState.isDefined)
+    topicPartitionState.foreach { state =>
+      assertEquals(leaderPartition, state.leaderAndIsr.leader)
+      assertEquals(leaderEpoch, state.leaderAndIsr.leaderEpoch)
+      assertEquals(LeaderRecoveryState.RECOVERED, state.leaderAndIsr.leaderRecoveryState)
+      assertEquals(replicas.asScala.map(Integer2int).toList, state.leaderAndIsr.isr)
+    }
+
+    // Verify the broker and topic configs (including sensitive configs).
+    val brokerProps = zkClient.getEntityConfigs(ConfigType.Broker, brokerId)
+    val topicProps = zkClient.getEntityConfigs(ConfigType.Topic, topicName)
+    assertEquals(2, brokerProps.size())
+
+    brokerProps.asScala.foreach { case (key, value) =>
+      if (key == KafkaConfig.SslKeystorePasswordProp) {
+        assertEquals(SECRET, encoder.decode(value).value)
+      } else {
+        assertEquals(props.getProperty(key), value)
+      }
+    }
+
+    topicProps.asScala.foreach { case (key, value) =>
+      if (key == KafkaConfig.SslKeystorePasswordProp) {
+        assertEquals(SECRET, encoder.decode(value).value)
+      } else {
+        assertEquals(props.getProperty(key), value)
+      }
+    }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
index 321903ed9ce..d04798542ea 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
@@ -36,6 +36,8 @@ class ZkMigrationTestHarness extends QuorumTestHarness {
 
   val SECRET = "secret"
 
+  val NEW_SECRET = "newSecret"
+
   val encoder: PasswordEncoder = {
     val encoderProps = new Properties()
     encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java
index 9a7b486ff11..57f63fb3cce 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.security.scram.ScramCredential;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 public interface ConfigMigrationClient {
 
@@ -38,6 +39,10 @@ public interface ConfigMigrationClient {
 
     void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer);
 
+    void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer);
+
+    void readTopicConfigs(String topicName, Consumer<Map<String, String>> configConsumer);
+
     ZkMigrationLeadershipState writeConfigs(
         ConfigResource configResource,
         Map<String, String> configMap,
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 213d86ca4e8..3706c8d3617 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -154,7 +154,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
         log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);
 
-        // Once we've recovered the migration state from ZK, install this class as a metadata published
+        // Once we've recovered the migration state from ZK, install this class as a metadata publisher
         // by calling the initialZkLoadHandler.
         initialZkLoadHandler.accept(this);
 
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index fc4d335b36e..bec062f5c98 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -98,7 +98,8 @@ public class KRaftMigrationZkWriter {
 
     /**
      * Handle a snapshot of the topic metadata. This requires scanning through all the topics and partitions
-     * in ZooKeeper to determine what has changed.
+     * in ZooKeeper to determine what has changed. Topic configs are not handled here since they exist in the
+     * ConfigurationsImage.
      */
     void handleTopicsSnapshot(TopicsImage topicsImage) {
         Map<Uuid, String> deletedTopics = new HashMap<>();
@@ -194,23 +195,48 @@ public class KRaftMigrationZkWriter {
     }
 
     void handleConfigsSnapshot(ConfigurationsImage configsImage) {
-        Set<ConfigResource> brokersToUpdate = new HashSet<>();
+        Set<ConfigResource> newResources = new HashSet<>();
+        configsImage.resourceData().keySet().forEach(resource -> {
+            if (EnumSet.of(ConfigResource.Type.BROKER, ConfigResource.Type.TOPIC).contains(resource.type())) {
+                newResources.add(resource);
+            } else {
+                throw new RuntimeException("Unknown config resource type " + resource.type());
+            }
+        });
+        Set<ConfigResource> resourcesToUpdate = new HashSet<>();
+        BiConsumer<ConfigResource, Map<String, String>> processConfigsForResource = (ConfigResource resource, Map<String, String> configs) -> {
+            newResources.remove(resource);
+            Map<String, String> kraftProps = configsImage.configMapForResource(resource);
+            if (!kraftProps.equals(configs)) {
+                resourcesToUpdate.add(resource);
+            }
+        };
+
         migrationClient.configClient().iterateBrokerConfigs((broker, configs) -> {
             ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, broker);
-            Map<String, String> kraftProps = configsImage.configMapForResource(brokerResource);
-            if (!kraftProps.equals(configs)) {
-                brokersToUpdate.add(brokerResource);
+            processConfigsForResource.accept(brokerResource, configs);
+        });
+        migrationClient.configClient().iterateTopicConfigs((topic, configs) -> {
+            ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+            processConfigsForResource.accept(topicResource, configs);
+        });
+
+        newResources.forEach(resource -> {
+            Map<String, String> props = configsImage.configMapForResource(resource);
+            if (!props.isEmpty()) {
+                operationConsumer.accept("Create configs for " + resource.type().name() + " " + resource.name(),
+                    migrationState -> migrationClient.configClient().writeConfigs(resource, props, migrationState));
             }
         });
 
-        brokersToUpdate.forEach(brokerResource -> {
-            Map<String, String> props = configsImage.configMapForResource(brokerResource);
+        resourcesToUpdate.forEach(resource -> {
+            Map<String, String> props = configsImage.configMapForResource(resource);
             if (props.isEmpty()) {
-                operationConsumer.accept("Delete configs for broker " + brokerResource.name(), migrationState ->
-                    migrationClient.configClient().deleteConfigs(brokerResource, migrationState));
+                operationConsumer.accept("Delete configs for " + resource.type().name() + " " + resource.name(),
+                    migrationState -> migrationClient.configClient().deleteConfigs(resource, migrationState));
             } else {
-                operationConsumer.accept("Update configs for broker " + brokerResource.name(), migrationState ->
-                    migrationClient.configClient().writeConfigs(brokerResource, props, migrationState));
+                operationConsumer.accept("Update configs for " + resource.type().name() + " " + resource.name(),
+                    migrationState -> migrationClient.configClient().writeConfigs(resource, props, migrationState));
             }
         });
     }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
index e373d066f2c..e2fac2f1e5c 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
@@ -24,23 +24,18 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 public interface TopicMigrationClient {
 
     enum TopicVisitorInterest {
         TOPICS,
-        PARTITIONS,
-        CONFIGS
+        PARTITIONS
     }
 
     interface TopicVisitor {
         void visitTopic(String topicName, Uuid topicId, Map<Integer, List<Integer>> assignments);
         default void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistration partitionRegistration) {
 
-        }
-        default void visitConfigs(String topicName, Properties topicProps) {
-
         }
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java
index 19ed12a381b..6c1393dbc3e 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java
@@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 public class CapturingConfigMigrationClient implements ConfigMigrationClient {
     public List<ConfigResource> deletedResources = new ArrayList<>();
@@ -44,6 +45,16 @@ public class CapturingConfigMigrationClient implements ConfigMigrationClient {
 
     }
 
+    @Override
+    public void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+
+    }
+
+    @Override
+    public void readTopicConfigs(String topicName, Consumer<Map<String, String>> configConsumer) {
+
+    }
+
     @Override
     public ZkMigrationLeadershipState writeConfigs(ConfigResource configResource, Map<String, String> configMap, ZkMigrationLeadershipState state) {
         writtenConfigs.put(configResource, configMap);