You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/05/05 08:35:36 UTC

[kafka] branch trunk updated: KAFKA-14840: Support for snapshots during ZK migration (#13461)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0822ce0ed1a KAFKA-14840: Support for snapshots during ZK migration (#13461)
0822ce0ed1a is described below

commit 0822ce0ed1a106a510930bc9ac53a266f54684d7
Author: David Arthur <mu...@gmail.com>
AuthorDate: Fri May 5 04:35:26 2023 -0400

    KAFKA-14840: Support for snapshots during ZK migration (#13461)
    
    This patch adds support for handling metadata snapshots while in dual-write mode. Prior to this change, if the active
    controller loaded a snapshot, it would get out of sync with the ZK state.
    
    In order to reconcile the snapshot state with ZK, several methods were added to scan through the metadata in ZK to
    compute differences with the MetadataImage. Since this introduced a lot of code, I opted to split out a lot of methods
    from ZkMigrationClient into their own client interfaces, such as TopicMigrationClient, ConfigMigrationClient, and
    AclMigrationClient. Each of these has some iterator method that lets the caller examine the ZK state in a single pass
    and without using too much memory.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Luke Chen <sh...@gmail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../main/scala/kafka/server/ControllerServer.scala |   2 +-
 .../main/scala/kafka/zk/ZkMigrationClient.scala    | 595 +++++----------------
 .../kafka/zk/migration/ZkAclMigrationClient.scala  | 117 ++++
 .../zk/migration/ZkConfigMigrationClient.scala     | 322 +++++++++++
 .../zk/migration/ZkTopicMigrationClient.scala      | 253 +++++++++
 .../kafka/zk/ZkMigrationIntegrationTest.scala      |  12 +-
 .../unit/kafka/zk/ZkMigrationClientTest.scala      | 555 -------------------
 .../zk/migration/ZkAclMigrationClientTest.scala    | 118 ++++
 .../zk/migration/ZkConfigMigrationClientTest.scala | 244 +++++++++
 .../kafka/zk/migration/ZkMigrationClientTest.scala | 262 +++++++++
 .../zk/migration/ZkMigrationTestHarness.scala      |  64 +++
 .../java/org/apache/kafka/image/AclsDelta.java     |  14 +
 .../apache/kafka/image/ConfigurationsImage.java    |   2 +-
 .../metadata/migration/AclMigrationClient.java     |  40 ++
 .../metadata/migration/ConfigMigrationClient.java  |  58 ++
 .../metadata/migration/KRaftMigrationDriver.java   | 204 ++-----
 .../migration/KRaftMigrationOperation.java         |  23 +
 .../metadata/migration/KRaftMigrationZkWriter.java | 418 +++++++++++++++
 .../kafka/metadata/migration/MigrationClient.java  |  48 +-
 .../metadata/migration/TopicMigrationClient.java   |  65 +++
 .../migration/ZkMigrationLeadershipState.java      |  19 +-
 .../org/apache/kafka/image/TopicsImageTest.java    |   4 +-
 .../migration/CapturingAclMigrationClient.java     |  56 ++
 .../migration/CapturingConfigMigrationClient.java  |  65 +++
 .../migration/CapturingMigrationClient.java        | 148 +++++
 .../migration/CapturingTopicMigrationClient.java   |  66 +++
 .../migration/KRaftMigrationDriverTest.java        | 341 ++++++------
 .../tests/core/zookeeper_migration_test.py         |   2 +
 29 files changed, 2725 insertions(+), 1394 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 67f16f7d766..9cbb54b5cf3 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -300,7 +300,7 @@
 
     <!-- metadata -->
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest|ClusterControlManagerTest).java"/>
+              files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest|ClusterControlManagerTest|KRaftMigrationDriverTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 01f76b220a7..8832e8878aa 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -259,7 +259,7 @@ class ControllerServer(
             config.passwordEncoderIterations)
           case None => PasswordEncoder.noop()
         }
-        val migrationClient = new ZkMigrationClient(zkClient, zkConfigEncoder)
+        val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder)
         val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config)
         val migrationDriver = new KRaftMigrationDriver(
           config.nodeId,
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 54e0170a71f..f3e21e72844 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -16,49 +16,45 @@
  */
 package kafka.zk
 
-import kafka.api.LeaderAndIsr
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls}
-import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminManager}
 import kafka.utils.{Logging, PasswordEncoder}
-import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.wrapZkException
+import kafka.zk.migration.{ZkAclMigrationClient, ZkConfigMigrationClient, ZkTopicMigrationClient}
 import kafka.zookeeper._
 import org.apache.kafka.clients.admin.ScramMechanism
 import org.apache.kafka.common.acl.AccessControlEntry
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
 import org.apache.kafka.common.metadata._
-import org.apache.kafka.common.quota.ClientQuotaEntity
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.kafka.common.security.scram.ScramCredential
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor
+import org.apache.kafka.metadata.migration.TopicMigrationClient.{TopicVisitor, TopicVisitorInterest}
+import org.apache.kafka.metadata.migration._
 import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock}
-import org.apache.zookeeper.KeeperException.{AuthFailedException, Code, NoAuthException, SessionClosedRequireAuthException}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}
 
-import java.util
+import java.{lang, util}
 import java.util.Properties
-import java.util.function.{BiConsumer, Consumer}
+import java.util.function.Consumer
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object ZkMigrationClient {
+
   val MaxBatchSize = 100
-}
 
-/**
- * Migration client in KRaft controller responsible for handling communication to Zookeeper and
- * the ZkBrokers present in the cluster. Methods that directly use KafkaZkClient should use the wrapZkException
- * wrapper function in order to translate KeeperExceptions into something usable by the caller.
- */
-class ZkMigrationClient(
-  zkClient: KafkaZkClient,
-  zkConfigEncoder: PasswordEncoder
-) extends MigrationClient with Logging {
+  def apply(
+    zkClient: KafkaZkClient,
+    zkConfigEncoder: PasswordEncoder
+  ): ZkMigrationClient = {
+    val topicClient = new ZkTopicMigrationClient(zkClient)
+    val configClient = new ZkConfigMigrationClient(zkClient, zkConfigEncoder)
+    val aclClient = new ZkAclMigrationClient(zkClient)
+    new ZkMigrationClient(zkClient, topicClient, configClient, aclClient)
+  }
 
   /**
    * Wrap a function such that any KeeperExceptions is captured and converted to a MigrationClientException.
@@ -66,7 +62,7 @@ class ZkMigrationClient(
    * differently by the caller.
    */
   @throws(classOf[MigrationClientException])
-  private def wrapZkException[T](fn: => T): T = {
+  def wrapZkException[T](fn: => T): T = {
     try {
       fn
     } catch {
@@ -78,11 +74,35 @@ class ZkMigrationClient(
     }
   }
 
+  @throws(classOf[MigrationClientException])
+  def logAndRethrow[T](logger: Logging, msg: String)(fn: => T): T = {
+    try {
+      fn
+    } catch {
+      case e: Throwable =>
+        logger.error(msg, e)
+        throw e
+    }
+  }
+}
+
+
+/**
+ * Migration client in KRaft controller responsible for handling communication to Zookeeper and
+ * the ZkBrokers present in the cluster. Methods that directly use KafkaZkClient should use the wrapZkException
+ * wrapper function in order to translate KeeperExceptions into something usable by the caller.
+ */
+class ZkMigrationClient(
+  zkClient: KafkaZkClient,
+  topicClient: TopicMigrationClient,
+  configClient: ConfigMigrationClient,
+  aclClient: AclMigrationClient
+) extends MigrationClient with Logging {
+
   override def getOrCreateMigrationRecoveryState(
     initialState: ZkMigrationLeadershipState
   ): ZkMigrationLeadershipState = wrapZkException {
       zkClient.createTopLevelPaths()
-      zkClient.createAclPaths()
       zkClient.getOrCreateMigrationState(initialState)
     }
 
@@ -121,145 +141,108 @@ class ZkMigrationClient(
     recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
     brokerIdConsumer: Consumer[Integer]
   ): Unit = wrapZkException {
-    val topics = zkClient.getAllTopicsInCluster()
-    val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
-    val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
-    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
-      val partitions = partitionAssignments.keys.toSeq
-      val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
-      val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
-      topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
-        .setName(topic)
-        .setTopicId(topicIdOpt.get), 0.toShort))
+    var topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+    topicClient.iterateTopics(
+      util.EnumSet.allOf(classOf[TopicVisitorInterest]),
+      new TopicVisitor() {
+      override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
+        if (!topicBatch.isEmpty) {
+          recordConsumer.accept(topicBatch)
+          topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+        }
 
-      partitionAssignments.foreach { case (topicPartition, replicaAssignment) =>
-        replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
-        replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
-        val replicaList = replicaAssignment.replicas.map(Integer.valueOf).asJava
+        topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+          .setName(topicName)
+          .setTopicId(topicId), 0.toShort))
+      }
+
+      override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
         val record = new PartitionRecord()
-          .setTopicId(topicIdOpt.get)
-          .setPartitionId(topicPartition.partition)
-          .setReplicas(replicaList)
-          .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
-          .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
-        leaderIsrAndControllerEpochs.get(topicPartition) match {
-          case Some(leaderIsrAndEpoch) => record
-              .setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
-              .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
-              .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
-              .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
-              .setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value())
-          case None =>
-            warn(s"Could not find partition state in ZK for $topicPartition. Initializing this partition " +
-              s"with ISR={$replicaList} and leaderEpoch=0.")
-            record
-              .setIsr(replicaList)
-              .setLeader(replicaList.get(0))
-              .setLeaderEpoch(0)
-              .setPartitionEpoch(0)
-              .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
-        }
+          .setTopicId(topicIdPartition.topicId())
+          .setPartitionId(topicIdPartition.partition())
+          .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
+          .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
+          .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
+          .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
+          .setLeader(partitionRegistration.leader)
+          .setLeaderEpoch(partitionRegistration.leaderEpoch)
+          .setPartitionEpoch(partitionRegistration.partitionEpoch)
+          .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
+        partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
+        partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
         topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
       }
 
-      val props = topicConfigs(topic)
-      props.forEach { case (key: Object, value: Object) =>
-        topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
-          .setResourceType(ConfigResource.Type.TOPIC.id)
-          .setResourceName(topic)
-          .setName(key.toString)
-          .setValue(value.toString), 0.toShort))
+      override def visitConfigs(topicName: String, topicProps: Properties): Unit = {
+        topicProps.forEach((key: Any, value: Any) => {
+          topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+            .setResourceType(ConfigResource.Type.TOPIC.id)
+            .setResourceName(topicName)
+            .setName(key.toString)
+            .setValue(value.toString), 0.toShort))
+        })
       }
+    })
+
+    if (!topicBatch.isEmpty) {
       recordConsumer.accept(topicBatch)
     }
   }
 
   def migrateBrokerConfigs(
-    recordConsumer: Consumer[util.List[ApiMessageAndVersion]]
+    recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+    brokerIdConsumer: Consumer[Integer]
   ): Unit = wrapZkException {
-    val batch = new util.ArrayList[ApiMessageAndVersion]()
-
-    val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
-    zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
-      val brokerResource = if (broker == ConfigEntityName.Default) {
-        ""
-      } else {
-        broker
-      }
-      props.asScala.foreach { case (key, value) =>
-        val newValue = if (DynamicBrokerConfig.isPasswordConfig(key))
-          zkConfigEncoder.decode(value).value
-        else
-          value
-
+    configClient.iterateBrokerConfigs((broker, props) => {
+      brokerIdConsumer.accept(Integer.valueOf(broker))
+      val batch = new util.ArrayList[ApiMessageAndVersion]()
+      props.forEach((key, value) => {
         batch.add(new ApiMessageAndVersion(new ConfigRecord()
           .setResourceType(ConfigResource.Type.BROKER.id)
-          .setResourceName(brokerResource)
+          .setResourceName(broker)
           .setName(key)
-          .setValue(newValue), 0.toShort))
+          .setValue(value), 0.toShort))
+      })
+      if (!batch.isEmpty) {
+        recordConsumer.accept(batch)
       }
-    }
-    if (!batch.isEmpty) {
-      recordConsumer.accept(batch)
-    }
+    })
   }
 
   def migrateClientQuotas(
     recordConsumer: Consumer[util.List[ApiMessageAndVersion]]
   ): Unit = wrapZkException {
-    val adminZkClient = new AdminZkClient(zkClient)
-
-    def migrateEntityType(entityType: String): Unit = {
-      adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
-        val entity = new EntityData().setEntityType(entityType).setEntityName(name)
+    configClient.iterateClientQuotas(new ClientQuotaVisitor {
+      override def visitClientQuota(
+        entityDataList: util.List[ClientQuotaRecord.EntityData],
+        quotas: util.Map[String, lang.Double]
+      ): Unit = {
         val batch = new util.ArrayList[ApiMessageAndVersion]()
-        ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
-          val propertyValue = props.getProperty(mechanism.mechanismName)
-          if (propertyValue != null) {
-            val scramCredentials =  ScramCredentialUtils.credentialFromString(propertyValue)
-            batch.add(new ApiMessageAndVersion(new UserScramCredentialRecord()
-              .setName(name)
-              .setMechanism(mechanism.`type`)
-              .setSalt(scramCredentials.salt)
-              .setStoredKey(scramCredentials.storedKey)
-              .setServerKey(scramCredentials.serverKey)
-              .setIterations(scramCredentials.iterations), 0.toShort))
-            props.remove(mechanism.mechanismName)
-          }
-        }
-        ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+        quotas.forEach((key, value) => {
           batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
-            .setEntity(List(entity).asJava)
+            .setEntity(entityDataList)
             .setKey(key)
             .setValue(value), 0.toShort))
-        }
+        })
         recordConsumer.accept(batch)
       }
-    }
-
-    migrateEntityType(ConfigType.User)
-    migrateEntityType(ConfigType.Client)
-    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
-      // Taken from ZkAdminManager
-      val components = name.split("/")
-      if (components.size != 3 || components(1) != "clients")
-        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
-      val entity = List(
-        new EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
-        new EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
-      )
 
-      val batch = new util.ArrayList[ApiMessageAndVersion]()
-      ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
-        batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
-          .setEntity(entity.asJava)
-          .setKey(key)
-          .setValue(value), 0.toShort))
+      override def visitScramCredential(
+        userName: String,
+        scramMechanism: ScramMechanism,
+        scramCredential: ScramCredential
+      ): Unit = {
+        val batch = new util.ArrayList[ApiMessageAndVersion]()
+        batch.add(new ApiMessageAndVersion(new UserScramCredentialRecord()
+          .setName(userName)
+          .setMechanism(scramMechanism.`type`)
+          .setSalt(scramCredential.salt)
+          .setStoredKey(scramCredential.storedKey)
+          .setServerKey(scramCredential.serverKey)
+          .setIterations(scramCredential.iterations), 0.toShort))
+        recordConsumer.accept(batch)
       }
-      recordConsumer.accept(batch)
-    }
-
-    migrateEntityType(ConfigType.Ip)
+    })
   }
 
   def migrateProducerId(
@@ -277,20 +260,8 @@ class ZkMigrationClient(
     }
   }
 
-  override def iterateAcls(aclConsumer: BiConsumer[ResourcePattern, util.Set[AccessControlEntry]]): Unit = {
-    // This is probably fairly inefficient, but it preserves the semantics from AclAuthorizer (which is non-trivial)
-    var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering)
-    def updateAcls(resourcePattern: ResourcePattern, versionedAcls: VersionedAcls): Unit = {
-      allAcls = allAcls.updated(resourcePattern, versionedAcls)
-    }
-    AclAuthorizer.loadAllAcls(zkClient, this, updateAcls)
-    allAcls.foreach { case (resourcePattern, versionedAcls) =>
-      aclConsumer.accept(resourcePattern, versionedAcls.acls.map(_.ace).asJava)
-    }
-  }
-
   def migrateAcls(recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
-    iterateAcls(new util.function.BiConsumer[ResourcePattern, util.Set[AccessControlEntry]]() {
+    aclClient.iterateAcls(new util.function.BiConsumer[ResourcePattern, util.Set[AccessControlEntry]]() {
       override def accept(resourcePattern: ResourcePattern, acls: util.Set[AccessControlEntry]): Unit = {
         val batch = new util.ArrayList[ApiMessageAndVersion]()
         acls.asScala.foreach { entry =>
@@ -320,7 +291,7 @@ class ZkMigrationClient(
     brokerIdConsumer: Consumer[Integer]
   ): Unit = {
     migrateTopics(batchConsumer, brokerIdConsumer)
-    migrateBrokerConfigs(batchConsumer)
+    migrateBrokerConfigs(batchConsumer, brokerIdConsumer)
     migrateClientQuotas(batchConsumer)
     migrateProducerId(batchConsumer)
     migrateAcls(batchConsumer)
@@ -330,213 +301,6 @@ class ZkMigrationClient(
     new util.HashSet[Integer](zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava)
   }
 
-  override def readBrokerIdsFromTopicAssignments(): util.Set[Integer] = wrapZkException {
-    val topics = zkClient.getAllTopicsInCluster()
-    val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
-    val brokersWithAssignments = new util.HashSet[Integer]()
-    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) =>
-      assignments.values.foreach { assignment =>
-        assignment.replicas.foreach { brokerId => brokersWithAssignments.add(brokerId) }
-      }
-    }
-    brokersWithAssignments
-  }
-
-  override def createTopic(
-    topicName: String,
-    topicId: Uuid,
-    partitions: util.Map[Integer, PartitionRegistration],
-    state: ZkMigrationLeadershipState
-  ): ZkMigrationLeadershipState = wrapZkException {
-    val assignments = partitions.asScala.map { case (partitionId, partition) =>
-      new TopicPartition(topicName, partitionId) ->
-        ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
-    }
-
-    val createTopicZNode = {
-      val path = TopicZNode.path(topicName)
-      CreateRequest(
-        path,
-        TopicZNode.encode(Some(topicId), assignments),
-        zkClient.defaultAcls(path),
-        CreateMode.PERSISTENT)
-    }
-    val createPartitionsZNode = {
-      val path = TopicPartitionsZNode.path(topicName)
-      CreateRequest(
-        path,
-        null,
-        zkClient.defaultAcls(path),
-        CreateMode.PERSISTENT)
-    }
-
-    val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) =>
-      val topicPartition = new TopicPartition(topicName, partitionId)
-      Seq(
-        createTopicPartition(topicPartition),
-        createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
-      )
-    }
-
-    val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
-    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
-    val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
-    if (resultCodes(TopicZNode.path(topicName)).equals(Code.NODEEXISTS)) {
-      // topic already created, just return
-      state
-    } else if (resultCodes.forall { case (_, code) => code.equals(Code.OK) } ) {
-      // ok
-      state.withMigrationZkVersion(migrationZkVersion)
-    } else {
-      // not ok
-      throw new MigrationClientException(s"Failed to create or update topic $topicName. ZK operation had results $resultCodes")
-    }
-  }
-
-  private def createTopicPartition(
-    topicPartition: TopicPartition
-  ): CreateRequest = wrapZkException {
-    val path = TopicPartitionZNode.path(topicPartition)
-    CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
-  }
-
-  private def partitionStatePathAndData(
-    topicPartition: TopicPartition,
-    partitionRegistration: PartitionRegistration,
-    controllerEpoch: Int
-  ): (String, Array[Byte]) = {
-    val path = TopicPartitionStateZNode.path(topicPartition)
-    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(LeaderAndIsr(
-      partitionRegistration.leader,
-      partitionRegistration.leaderEpoch,
-      partitionRegistration.isr.toList,
-      partitionRegistration.leaderRecoveryState,
-      partitionRegistration.partitionEpoch), controllerEpoch))
-    (path, data)
-  }
-
-  private def createTopicPartitionState(
-    topicPartition: TopicPartition,
-    partitionRegistration: PartitionRegistration,
-    controllerEpoch: Int
-  ): CreateRequest = {
-    val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
-    CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
-  }
-
-  private def updateTopicPartitionState(
-    topicPartition: TopicPartition,
-    partitionRegistration: PartitionRegistration,
-    controllerEpoch: Int
-  ): SetDataRequest = {
-    val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
-    SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
-  }
-
-  override def updateTopicPartitions(
-    topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
-    state: ZkMigrationLeadershipState
-  ): ZkMigrationLeadershipState = wrapZkException {
-    val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) =>
-      partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) =>
-        val topicPartition = new TopicPartition(topicName, partitionId)
-        Seq(updateTopicPartitionState(topicPartition, partitionRegistration, state.kraftControllerEpoch()))
-      }
-    }
-    if (requests.isEmpty) {
-      state
-    } else {
-      val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state)
-      val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
-      if (resultCodes.forall { case (_, code) => code.equals(Code.OK) } ) {
-        state.withMigrationZkVersion(migrationZkVersion)
-      } else {
-        throw new MigrationClientException(s"Failed to update partition states: $topicPartitions. ZK transaction had results $resultCodes")
-      }
-    }
-  }
-
-  // Try to update an entity config and the migration state. If NoNode is encountered, it probably means we
-  // need to recursively create the parent ZNode. In this case, return None.
-  def tryWriteEntityConfig(
-    entityType: String,
-    path: String,
-    props: Properties,
-    create: Boolean,
-    state: ZkMigrationLeadershipState
-  ): Option[ZkMigrationLeadershipState] = wrapZkException {
-    val configData = ConfigEntityZNode.encode(props)
-
-    val requests = if (create) {
-      Seq(CreateRequest(ConfigEntityZNode.path(entityType, path), configData, zkClient.defaultAcls(path), CreateMode.PERSISTENT))
-    } else {
-      Seq(SetDataRequest(ConfigEntityZNode.path(entityType, path), configData, ZkVersion.MatchAnyVersion))
-    }
-    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
-    if (!create && responses.head.resultCode.equals(Code.NONODE)) {
-      // Not fatal. Just means we need to Create this node instead of SetData
-      None
-    } else if (responses.head.resultCode.equals(Code.OK)) {
-      Some(state.withMigrationZkVersion(migrationZkVersion))
-    } else {
-      throw KeeperException.create(responses.head.resultCode, path)
-    }
-  }
-
-  override def writeClientQuotas(
-    entity: util.Map[String, String],
-    quotas: util.Map[String, java.lang.Double],
-    scram: util.Map[String, String],
-    state: ZkMigrationLeadershipState
-  ): ZkMigrationLeadershipState = wrapZkException {
-    val entityMap = entity.asScala
-    val hasUser = entityMap.contains(ClientQuotaEntity.USER)
-    val hasClient = entityMap.contains(ClientQuotaEntity.CLIENT_ID)
-    val hasIp = entityMap.contains(ClientQuotaEntity.IP)
-    val props = new Properties()
-    // We store client quota values as strings in the ZK JSON
-    scram.forEach { case (key, value) => props.put(key, value.toString) }
-    quotas.forEach { case (key, value) => props.put(key, value.toString) }
-    val (configType, path) = if (hasUser && !hasClient) {
-      (Some(ConfigType.User), Some(entityMap(ClientQuotaEntity.USER)))
-    } else if (hasUser && hasClient) {
-      (Some(ConfigType.User), Some(s"${entityMap(ClientQuotaEntity.USER)}/clients/${entityMap(ClientQuotaEntity.CLIENT_ID)}"))
-    } else if (hasClient) {
-      (Some(ConfigType.Client), Some(entityMap(ClientQuotaEntity.CLIENT_ID)))
-    } else if (hasIp) {
-      (Some(ConfigType.Ip), Some(entityMap(ClientQuotaEntity.IP)))
-    } else {
-      (None, None)
-    }
-
-    if (path.isEmpty) {
-      error(s"Skipping unknown client quota entity $entity")
-      return state
-    }
-
-    // Try to write the client quota configs once with create=false, and again with create=true if the first operation fails
-    tryWriteEntityConfig(configType.get, path.get, props, create=false, state) match {
-      case Some(newState) =>
-        newState
-      case None =>
-        // If we didn't update the migration state, we failed to write the client quota. Try again
-        // after recursively create its parent znodes
-        val createPath = if (hasUser && hasClient) {
-          s"${ConfigEntityTypeZNode.path(configType.get)}/${entityMap(ClientQuotaEntity.USER)}/clients"
-        } else {
-          ConfigEntityTypeZNode.path(configType.get)
-        }
-        zkClient.createRecursive(createPath, throwIfPathExists=false)
-        debug(s"Recursively creating ZNode $createPath and attempting to write $entity quotas a second time.")
-
-        tryWriteEntityConfig(configType.get, path.get, props, create=true, state) match {
-          case Some(newStateSecondTry) => newStateSecondTry
-          case None => throw new MigrationClientException(
-            s"Could not write client quotas for $entity on second attempt when using Create instead of SetData")
-        }
-    }
-  }
-
   override def writeProducerId(
     nextProducerId: Long,
     state: ZkMigrationLeadershipState
@@ -549,116 +313,9 @@ class ZkMigrationClient(
     state.withMigrationZkVersion(migrationZkVersion)
   }
 
-  override def writeConfigs(
-    resource: ConfigResource,
-    configs: util.Map[String, String],
-    state: ZkMigrationLeadershipState
-  ): ZkMigrationLeadershipState = wrapZkException {
-    val configType = resource.`type`() match {
-      case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
-      case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
-      case _ => None
-    }
-
-    val configName = resource.name()
-    if (configType.isDefined) {
-      val props = new Properties()
-      configs.forEach { case (key, value) => props.put(key, value) }
-      tryWriteEntityConfig(configType.get, configName, props, create=false, state) match {
-        case Some(newState) =>
-          newState
-        case None =>
-          val createPath = ConfigEntityTypeZNode.path(configType.get)
-          debug(s"Recursively creating ZNode $createPath and attempting to write $resource configs a second time.")
-          zkClient.createRecursive(createPath, throwIfPathExists=false)
-
-          tryWriteEntityConfig(configType.get, configName, props, create=true, state) match {
-            case Some(newStateSecondTry) => newStateSecondTry
-            case None => throw new MigrationClientException(
-              s"Could not write ${configType.get} configs on second attempt when using Create instead of SetData.")
-          }
-      }
-    } else {
-      debug(s"Not updating ZK for $resource since it is not a Broker or Topic entity.")
-      state
-    }
-  }
-
-  private def aclChangeNotificationRequest(resourcePattern: ResourcePattern): CreateRequest = {
-    // ZK broker needs the ACL change notification znode to be updated in order to process the new ACLs
-    val aclChange = ZkAclStore(resourcePattern.patternType).changeStore.createChangeNode(resourcePattern)
-    CreateRequest(aclChange.path, aclChange.bytes, zkClient.defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
-  }
-
-  private def tryWriteAcls(
-    resourcePattern: ResourcePattern,
-    aclEntries: Set[AclEntry],
-    create: Boolean,
-    state: ZkMigrationLeadershipState
-  ): Option[ZkMigrationLeadershipState] = wrapZkException {
-    val aclData = ResourceZNode.encode(aclEntries)
-
-    val request = if (create) {
-      val path = ResourceZNode.path(resourcePattern)
-      CreateRequest(path, aclData, zkClient.defaultAcls(path), CreateMode.PERSISTENT)
-    } else {
-      SetDataRequest(ResourceZNode.path(resourcePattern), aclData, ZkVersion.MatchAnyVersion)
-    }
-
-    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
-    if (responses.head.resultCode.equals(Code.NONODE)) {
-      // Need to call this method again with create=true
-      None
-    } else {
-      // Write the ACL notification outside of a metadata multi-op
-      zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
-      Some(state.withMigrationZkVersion(migrationZkVersion))
-    }
-  }
-
-  override def writeAddedAcls(
-    resourcePattern: ResourcePattern,
-    newAcls: util.List[AccessControlEntry],
-    state: ZkMigrationLeadershipState
-  ): ZkMigrationLeadershipState = {
-
-    val existingAcls = AclAuthorizer.getAclsFromZk(zkClient, resourcePattern)
-    val addedAcls = newAcls.asScala.map(new AclEntry(_)).toSet
-    val updatedAcls = existingAcls.acls ++ addedAcls
-
-    tryWriteAcls(resourcePattern, updatedAcls, create=false, state) match {
-      case Some(newState) => newState
-      case None => tryWriteAcls(resourcePattern, updatedAcls, create=true, state) match {
-        case Some(newState) => newState
-        case None => throw new MigrationClientException(s"Could not write ACLs for resource pattern $resourcePattern")
-      }
-    }
-  }
-
-  override def removeDeletedAcls(
-    resourcePattern: ResourcePattern,
-    deletedAcls: util.List[AccessControlEntry],
-    state: ZkMigrationLeadershipState
-  ): ZkMigrationLeadershipState = wrapZkException {
+  override def topicClient(): TopicMigrationClient = topicClient
 
-    val existingAcls = AclAuthorizer.getAclsFromZk(zkClient, resourcePattern)
-    val removedAcls = deletedAcls.asScala.map(new AclEntry(_)).toSet
-    val remainingAcls = existingAcls.acls -- removedAcls
+  override def configClient(): ConfigMigrationClient = configClient
 
-    val request = if (remainingAcls.isEmpty) {
-      DeleteRequest(ResourceZNode.path(resourcePattern), ZkVersion.MatchAnyVersion)
-    } else {
-      val aclData = ResourceZNode.encode(remainingAcls)
-      SetDataRequest(ResourceZNode.path(resourcePattern), aclData, ZkVersion.MatchAnyVersion)
-    }
-
-    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
-    if (responses.head.resultCode.equals(Code.OK) || responses.head.resultCode.equals(Code.NONODE)) {
-      // Write the ACL notification outside of a metadata multi-op
-      zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
-      state.withMigrationZkVersion(migrationZkVersion)
-    } else {
-      throw new MigrationClientException(s"Could not delete ACL for resource pattern $resourcePattern")
-    }
-  }
+  override def aclClient(): AclMigrationClient = aclClient
 }
diff --git a/core/src/main/scala/kafka/zk/migration/ZkAclMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkAclMigrationClient.scala
new file mode 100644
index 00000000000..482476ff4bb
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/migration/ZkAclMigrationClient.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk.migration
+
+import kafka.security.authorizer.AclAuthorizer.{ResourceOrdering, VersionedAcls}
+import kafka.security.authorizer.{AclAuthorizer, AclEntry}
+import kafka.utils.Logging
+import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
+import kafka.zk.{KafkaZkClient, ResourceZNode, ZkAclStore, ZkVersion}
+import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
+import org.apache.kafka.common.acl.AccessControlEntry
+import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.metadata.migration.{AclMigrationClient, MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException.Code
+
+import java.util
+import java.util.function.BiConsumer
+import scala.jdk.CollectionConverters._
+
+class ZkAclMigrationClient(
+  zkClient: KafkaZkClient
+) extends AclMigrationClient with Logging {
+
+  private def aclChangeNotificationRequest(resourcePattern: ResourcePattern): CreateRequest = {
+    // ZK broker needs the ACL change notification znode to be updated in order to process the new ACLs
+    val aclChange = ZkAclStore(resourcePattern.patternType).changeStore.createChangeNode(resourcePattern)
+    CreateRequest(aclChange.path, aclChange.bytes, zkClient.defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
+  }
+
+  private def tryWriteAcls(
+    resourcePattern: ResourcePattern,
+    aclEntries: Set[AclEntry],
+    create: Boolean,
+    state: ZkMigrationLeadershipState
+  ): Option[ZkMigrationLeadershipState] = wrapZkException {
+    val aclData = ResourceZNode.encode(aclEntries)
+
+    val request = if (create) {
+      val path = ResourceZNode.path(resourcePattern)
+      CreateRequest(path, aclData, zkClient.defaultAcls(path), CreateMode.PERSISTENT)
+    } else {
+      SetDataRequest(ResourceZNode.path(resourcePattern), aclData, ZkVersion.MatchAnyVersion)
+    }
+
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
+    if (responses.head.resultCode.equals(Code.NONODE)) {
+      // Need to call this method again with create=true
+      None
+    } else {
+      // Write the ACL notification outside of a metadata multi-op
+      zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
+      Some(state.withMigrationZkVersion(migrationZkVersion))
+    }
+  }
+
+  override def writeResourceAcls(
+    resourcePattern: ResourcePattern,
+    aclsToWrite: util.Collection[AccessControlEntry],
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = {
+    val acls = aclsToWrite.asScala.map(new AclEntry(_)).toSet
+    tryWriteAcls(resourcePattern, acls, create = false, state) match {
+      case Some(newState) => newState
+      case None => tryWriteAcls(resourcePattern, acls, create = true, state) match {
+        case Some(newState) => newState
+        case None => throw new MigrationClientException(s"Could not write ACLs for resource pattern $resourcePattern")
+      }
+    }
+  }
+
+  override def deleteResource(
+    resourcePattern: ResourcePattern,
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = {
+    val request = DeleteRequest(ResourceZNode.path(resourcePattern), ZkVersion.MatchAnyVersion)
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
+    if (responses.head.resultCode.equals(Code.OK) || responses.head.resultCode.equals(Code.NONODE)) {
+      // Write the ACL notification outside of a metadata multi-op
+      zkClient.retryRequestUntilConnected(aclChangeNotificationRequest(resourcePattern))
+      state.withMigrationZkVersion(migrationZkVersion)
+    } else {
+      throw new MigrationClientException(s"Could not delete ACL for resource pattern $resourcePattern")
+    }
+  }
+
+  override def iterateAcls(
+    aclConsumer: BiConsumer[ResourcePattern, util.Set[AccessControlEntry]]
+  ): Unit = {
+    // This is probably fairly inefficient, but it preserves the semantics from AclAuthorizer (which is non-trivial)
+    var allAcls = new scala.collection.immutable.TreeMap[ResourcePattern, VersionedAcls]()(new ResourceOrdering)
+    def updateAcls(resourcePattern: ResourcePattern, versionedAcls: VersionedAcls): Unit = {
+      allAcls = allAcls.updated(resourcePattern, versionedAcls)
+    }
+    AclAuthorizer.loadAllAcls(zkClient, this, updateAcls)
+    allAcls.foreach { case (resourcePattern, versionedAcls) =>
+      logAndRethrow(this, s"Error in ACL consumer. Resource was $resourcePattern.") {
+        aclConsumer.accept(resourcePattern, versionedAcls.acls.map(_.ace).asJava)
+      }
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
new file mode 100644
index 00000000000..dbcc1d99b93
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk.migration
+
+import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, DynamicConfig, ZkAdminManager}
+import kafka.utils.{Logging, PasswordEncoder}
+import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
+import kafka.zk._
+import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
+import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
+import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor
+import org.apache.kafka.metadata.migration.{ConfigMigrationClient, MigrationClientException, ZkMigrationLeadershipState}
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, KeeperException}
+
+import java.{lang, util}
+import java.util.Properties
+import java.util.function.BiConsumer
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class ZkConfigMigrationClient(
+  zkClient: KafkaZkClient,
+  passwordEncoder: PasswordEncoder
+) extends ConfigMigrationClient with Logging {
+
+  val adminZkClient = new AdminZkClient(zkClient)
+
+
+  /**
+   * In ZK, we use the special string "&lt;default&gt;" to represent the default entity.
+   * In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string
+   * to the special KRaft string.
+   */
+  private def fromZkEntityName(entityName: String): String = {
+    if (entityName.equals(ConfigEntityName.Default)) {
+      ""
+    } else {
+      entityName
+    }
+  }
+
+  private def toZkEntityName(entityName: String): String = {
+    if (entityName.isEmpty) {
+      ConfigEntityName.Default
+    } else {
+      entityName
+    }
+  }
+
+  private def buildEntityData(entityType: String, entityName: String): EntityData = {
+    new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName))
+  }
+
+
+  override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = {
+    def migrateEntityType(zkEntityType: String, entityType: String): Unit = {
+      adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) =>
+        val entity = List(buildEntityData(entityType, name)).asJava
+
+        ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
+          val propertyValue = props.getProperty(mechanism.mechanismName)
+          if (propertyValue != null) {
+            val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue)
+            logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") {
+              visitor.visitScramCredential(name, mechanism, scramCredentials)
+            }
+            props.remove(mechanism.mechanismName)
+          }
+        }
+
+        val quotaMap = ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).map {
+          case (key, value) => key -> lang.Double.valueOf(value)
+        }.toMap.asJava
+
+        if (!quotaMap.isEmpty) {
+          logAndRethrow(this, s"Error in client quota visitor. Entity was $entity.") {
+            visitor.visitClientQuota(entity, quotaMap)
+          }
+        }
+      }
+    }
+
+    migrateEntityType(ConfigType.User, ClientQuotaEntity.USER)
+    migrateEntityType(ConfigType.Client, ClientQuotaEntity.CLIENT_ID)
+
+    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
+      // Taken from ZkAdminManager
+      val components = name.split("/")
+      if (components.size != 3 || components(1) != "clients")
+        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+      val entity = List(
+        buildEntityData(ClientQuotaEntity.USER, components(0)),
+        buildEntityData(ClientQuotaEntity.CLIENT_ID, components(2))
+      )
+      val quotaMap = props.asScala.map { case (key, value) =>
+        val doubleValue = try lang.Double.valueOf(value) catch {
+          case _: NumberFormatException =>
+            throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
+        }
+        key -> doubleValue
+      }.asJava
+      logAndRethrow(this, s"Error in client quota entity visitor. Entity was $entity.") {
+        visitor.visitClientQuota(entity.asJava, quotaMap)
+      }
+    }
+
+    migrateEntityType(ConfigType.Ip, ClientQuotaEntity.IP)
+  }
+
+  override def iterateBrokerConfigs(configConsumer: BiConsumer[String, util.Map[String, String]]): Unit = {
+    val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+    zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
+      val brokerResource = fromZkEntityName(broker)
+      val decodedProps = props.asScala.map { case (key, value) =>
+        if (DynamicBrokerConfig.isPasswordConfig(key))
+          key -> passwordEncoder.decode(value).value
+        else
+          key -> value
+      }.toMap.asJava
+
+      logAndRethrow(this, s"Error in broker config consumer. Broker was $brokerResource.") {
+        configConsumer.accept(brokerResource, decodedProps)
+      }
+    }
+  }
+
+  override def writeConfigs(
+    configResource: ConfigResource,
+    configMap: util.Map[String, String],
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = wrapZkException {
+    val configType = configResource.`type`() match {
+      case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
+      case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
+      case _ => None
+    }
+
+    val configName = toZkEntityName(configResource.name())
+    if (configType.isDefined) {
+      val props = new Properties()
+      configMap.forEach { case (key, value) => props.put(key, value) }
+      tryWriteEntityConfig(configType.get, configName, props, create = false, state) match {
+        case Some(newState) =>
+          newState
+        case None =>
+          val createPath = ConfigEntityTypeZNode.path(configType.get)
+          debug(s"Recursively creating ZNode $createPath and attempting to write $configResource configs a second time.")
+          zkClient.createRecursive(createPath, throwIfPathExists = false)
+
+          tryWriteEntityConfig(configType.get, configName, props, create = true, state) match {
+            case Some(newStateSecondTry) => newStateSecondTry
+            case None => throw new MigrationClientException(
+              s"Could not write ${configType.get} configs on second attempt when using Create instead of SetData.")
+          }
+      }
+    } else {
+      error(s"Not updating ZK for $configResource since it is not a Broker or Topic entity.")
+      state
+    }
+  }
+
+  override def deleteConfigs(
+    configResource: ConfigResource,
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = wrapZkException {
+    val configType = configResource.`type`() match {
+      case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
+      case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
+      case _ => None
+    }
+
+    val configName = toZkEntityName(configResource.name())
+    if (configType.isDefined) {
+      val path = ConfigEntityZNode.path(configType.get, configName)
+      val requests = Seq(DeleteRequest(path, ZkVersion.MatchAnyVersion))
+      val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
+
+      if (responses.head.resultCode.equals(Code.NONODE)) {
+        // Not fatal.
+        error(s"Did not delete $configResource since the node did not exist.")
+        state
+      } else if (responses.head.resultCode.equals(Code.OK)) {
+        // Write the notification znode if our update was successful
+        zkClient.createConfigChangeNotification(s"$configType/$configName")
+        state.withMigrationZkVersion(migrationZkVersion)
+      } else {
+        throw KeeperException.create(responses.head.resultCode, path)
+      }
+    } else {
+      error(s"Not updating ZK for $configResource since it is not a Broker or Topic entity.")
+      state
+    }
+  }
+
+  override def writeClientQuotas(
+    entity: util.Map[String, String],
+    quotas: util.Map[String, java.lang.Double],
+    scram: util.Map[String, String],
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = wrapZkException {
+    val entityMap = entity.asScala
+    val user = entityMap.get(ClientQuotaEntity.USER).map(toZkEntityName)
+    val client = entityMap.get(ClientQuotaEntity.CLIENT_ID).map(toZkEntityName)
+    val ip = entityMap.get(ClientQuotaEntity.IP).map(toZkEntityName)
+    val props = new Properties()
+
+    val (configType, path, configKeys) = if (user.isDefined && client.isEmpty) {
+      (Some(ConfigType.User), user, DynamicConfig.User.configKeys)
+    } else if (user.isDefined && client.isDefined) {
+      (Some(ConfigType.User), Some(s"${user.get}/clients/${client.get}"),
+        DynamicConfig.User.configKeys)
+    } else if (client.isDefined) {
+      (Some(ConfigType.Client), client, DynamicConfig.Client.configKeys)
+    } else if (ip.isDefined) {
+      (Some(ConfigType.Ip), ip, DynamicConfig.Ip.configKeys)
+    } else {
+      (None, None, Map.empty.asJava)
+    }
+
+    if (path.isEmpty) {
+      error(s"Skipping unknown client quota entity $entity")
+      return state
+    }
+
+    // This logic is duplicated from ZkAdminManager
+    quotas.forEach { case (key, value) =>
+      val configKey = configKeys.get(key)
+      if (configKey == null) {
+        throw new MigrationClientException(s"Invalid configuration key ${key}")
+      } else {
+        configKey.`type` match {
+          case ConfigDef.Type.DOUBLE =>
+            props.setProperty(key, value.toString)
+          case ConfigDef.Type.LONG | ConfigDef.Type.INT =>
+            val epsilon = 1e-6
+            val intValue = if (configKey.`type` == ConfigDef.Type.LONG)
+              (value + epsilon).toLong
+            else
+              (value + epsilon).toInt
+            if ((intValue.toDouble - value).abs > epsilon)
+              throw new InvalidRequestException(s"Configuration ${key} must be a ${configKey.`type`} value")
+            props.setProperty(key, intValue.toString)
+          case _ =>
+            throw new MigrationClientException(s"Unexpected config type ${configKey.`type`}")
+        }
+      }
+    }
+    scram.forEach { case (key, value) => props.put(key, value) }
+
+    // Try to write the client quota configs once with create=false, and again with create=true if the first operation fails
+    tryWriteEntityConfig(configType.get, path.get, props, create = false, state) match {
+      case Some(newState) =>
+        newState
+      case None =>
+        // If we didn't update the migration state, we failed to write the client quota. Try again
+        // after recursively create its parent znodes
+        val createPath = if (user.isDefined && client.isDefined) {
+          s"${ConfigEntityTypeZNode.path(configType.get)}/${user.get}/clients"
+        } else {
+          ConfigEntityTypeZNode.path(configType.get)
+        }
+        zkClient.createRecursive(createPath, throwIfPathExists = false)
+        debug(s"Recursively creating ZNode $createPath and attempting to write $entity quotas a second time.")
+
+        tryWriteEntityConfig(configType.get, path.get, props, create = true, state) match {
+          case Some(newStateSecondTry) => newStateSecondTry
+          case None => throw new MigrationClientException(
+            s"Could not write client quotas for $entity on second attempt when using Create instead of SetData")
+        }
+    }
+  }
+
+  // Try to update an entity config and the migration state. If NoNode is encountered, it probably means we
+  // need to recursively create the parent ZNode. In this case, return None.
+  private def tryWriteEntityConfig(
+    entityType: String,
+    path: String,
+    props: Properties,
+    create: Boolean,
+    state: ZkMigrationLeadershipState
+  ): Option[ZkMigrationLeadershipState] = wrapZkException {
+    val configData = ConfigEntityZNode.encode(props)
+    val requests = if (create) {
+      Seq(CreateRequest(ConfigEntityZNode.path(entityType, path), configData, zkClient.defaultAcls(path), CreateMode.PERSISTENT))
+    } else {
+      Seq(SetDataRequest(ConfigEntityZNode.path(entityType, path), configData, ZkVersion.MatchAnyVersion))
+    }
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
+    if (!create && responses.head.resultCode.equals(Code.NONODE)) {
+      // Not fatal. Just means we need to Create this node instead of SetData
+      None
+    } else if (responses.head.resultCode.equals(Code.OK)) {
+      // Write the notification znode if our update was successful
+      zkClient.createConfigChangeNotification(s"$entityType/$path")
+      Some(state.withMigrationZkVersion(migrationZkVersion))
+    } else {
+      throw KeeperException.create(responses.head.resultCode, path)
+    }
+  }
+}
+
diff --git a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
new file mode 100644
index 00000000000..a51b7c808c2
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.server.ConfigType
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
+import kafka.zk._
+import kafka.zookeeper.{CreateRequest, DeleteRequest, GetChildrenRequest, SetDataRequest}
+import org.apache.kafka.common.metadata.PartitionRecord
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.metadata.migration.TopicMigrationClient.TopicVisitorInterest
+import org.apache.kafka.metadata.migration.{MigrationClientException, TopicMigrationClient, ZkMigrationLeadershipState}
+import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
+import org.apache.zookeeper.CreateMode
+import org.apache.zookeeper.KeeperException.Code
+
+import java.util
+import scala.collection.Seq
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+
+class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClient with Logging {
+  override def iterateTopics(
+    interests: util.EnumSet[TopicVisitorInterest],
+    visitor: TopicMigrationClient.TopicVisitor,
+  ): Unit = wrapZkException {
+    if (!interests.contains(TopicVisitorInterest.TOPICS)) {
+      throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.")
+    }
+    val topics = zkClient.getAllTopicsInCluster()
+    val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
+    val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
+      val topicAssignment = partitionAssignments.map { case (partition, assignment) =>
+        partition.partition().asInstanceOf[Integer] -> assignment.replicas.map(Integer.valueOf).asJava
+      }.toMap.asJava
+      logAndRethrow(this, s"Error in topic consumer. Topic was $topic.") {
+        visitor.visitTopic(topic, topicIdOpt.get, topicAssignment)
+      }
+      if (interests.contains(TopicVisitorInterest.PARTITIONS)) {
+        val partitions = partitionAssignments.keys.toSeq
+        val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
+        partitionAssignments.foreach { case (topicPartition, replicaAssignment) =>
+          val replicaList = replicaAssignment.replicas.map(Integer.valueOf).asJava
+          val record = new PartitionRecord()
+            .setTopicId(topicIdOpt.get)
+            .setPartitionId(topicPartition.partition)
+            .setReplicas(replicaList)
+            .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
+            .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
+          leaderIsrAndControllerEpochs.get(topicPartition) match {
+            case Some(leaderIsrAndEpoch) =>
+              record
+                .setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+                .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
+                .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
+                .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
+                .setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value())
+            case None =>
+              warn(s"Could not find partition state in ZK for $topicPartition. Initializing this partition " +
+                s"with ISR={$replicaList} and leaderEpoch=0.")
+              record
+                .setIsr(replicaList)
+                .setLeader(replicaList.get(0))
+                .setLeaderEpoch(0)
+                .setPartitionEpoch(0)
+                .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+          }
+          logAndRethrow(this, s"Error in partition consumer. TopicPartition was $topicPartition.") {
+            visitor.visitPartition(new TopicIdPartition(topicIdOpt.get, topicPartition), new PartitionRegistration(record))
+          }
+        }
+      }
+      if (interests.contains(TopicVisitorInterest.CONFIGS)) {
+        val props = topicConfigs(topic)
+        logAndRethrow(this, s"Error in topic config consumer. Topic was $topic.") {
+          visitor.visitConfigs(topic, props)
+        }
+      }
+    }
+  }
+
+  override def createTopic(
+    topicName: String,
+    topicId: Uuid,
+    partitions: util.Map[Integer, PartitionRegistration],
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = wrapZkException {
+
+    val assignments = partitions.asScala.map { case (partitionId, partition) =>
+      new TopicPartition(topicName, partitionId) ->
+        ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
+    }
+
+    val createTopicZNode = {
+      val path = TopicZNode.path(topicName)
+      CreateRequest(
+        path,
+        TopicZNode.encode(Some(topicId), assignments),
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+    val createPartitionsZNode = {
+      val path = TopicPartitionsZNode.path(topicName)
+      CreateRequest(
+        path,
+        null,
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+
+    val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) =>
+      val topicPartition = new TopicPartition(topicName, partitionId)
+      Seq(
+        createTopicPartition(topicPartition),
+        createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
+      )
+    }
+
+    val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
+    val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
+    if (resultCodes(TopicZNode.path(topicName)).equals(Code.NODEEXISTS)) {
+      // topic already created, just return
+      state
+    } else if (resultCodes.forall { case (_, code) => code.equals(Code.OK) }) {
+      // ok
+      state.withMigrationZkVersion(migrationZkVersion)
+    } else {
+      // not ok
+      throw new MigrationClientException(s"Failed to create or update topic $topicName. ZK operations had results $resultCodes")
+    }
+  }
+
+  private def recursiveChildren(path: String, acc: ArrayBuffer[String]): Unit = {
+    val topicChildZNodes = zkClient.retryRequestUntilConnected(GetChildrenRequest(path, registerWatch = false))
+    topicChildZNodes.children.foreach { child =>
+      recursiveChildren(s"$path/$child", acc)
+      acc.append(s"$path/$child")
+    }
+  }
+
+  private def recursiveChildren(path: String): Seq[String] = {
+    val buffer = new ArrayBuffer[String]()
+    recursiveChildren(path, buffer)
+    buffer.toSeq
+  }
+
+  override def deleteTopic(
+    topicName: String,
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = wrapZkException {
+    // Delete the partition state ZNodes recursively, then topic config, and finally the topic znode
+    val topicPath = TopicZNode.path(topicName)
+    val topicChildZNodes = recursiveChildren(topicPath)
+    val deleteRequests = topicChildZNodes.map { childPath =>
+      DeleteRequest(childPath, ZkVersion.MatchAnyVersion)
+    } ++ Seq(
+      DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topicName), ZkVersion.MatchAnyVersion),
+      DeleteRequest(TopicZNode.path(topicName), ZkVersion.MatchAnyVersion)
+    )
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(deleteRequests, state)
+    val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
+    if (responses.last.resultCode.equals(Code.OK)) {
+      state.withMigrationZkVersion(migrationZkVersion)
+    } else {
+      throw new MigrationClientException(s"Failed to delete topic $topicName. ZK operations had results $resultCodes")
+    }
+  }
+
+  override def updateTopicPartitions(
+    topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = wrapZkException {
+    val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) =>
+      partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) =>
+        val topicPartition = new TopicPartition(topicName, partitionId)
+        Seq(updateTopicPartitionState(topicPartition, partitionRegistration, state.kraftControllerEpoch()))
+      }
+    }
+    if (requests.isEmpty) {
+      state
+    } else {
+      val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state)
+      val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap
+      if (resultCodes.forall { case (_, code) => code.equals(Code.OK) }) {
+        state.withMigrationZkVersion(migrationZkVersion)
+      } else {
+        throw new MigrationClientException(s"Failed to update partition states: $topicPartitions. ZK transaction had results $resultCodes")
+      }
+    }
+  }
+
+  private def createTopicPartition(
+    topicPartition: TopicPartition
+  ): CreateRequest = wrapZkException {
+    val path = TopicPartitionZNode.path(topicPartition)
+    CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def partitionStatePathAndData(
+    topicPartition: TopicPartition,
+    partitionRegistration: PartitionRegistration,
+    controllerEpoch: Int
+  ): (String, Array[Byte]) = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    (path, data)
+  }
+
+  private def createTopicPartitionState(
+    topicPartition: TopicPartition,
+    partitionRegistration: PartitionRegistration,
+    controllerEpoch: Int
+  ): CreateRequest = {
+    val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
+    CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def updateTopicPartitionState(
+    topicPartition: TopicPartition,
+    partitionRegistration: PartitionRegistration,
+    controllerEpoch: Int
+  ): SetDataRequest = {
+    val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
+    SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
+  }
+}
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 33eefe75752..e92a6348e1e 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -105,7 +105,7 @@ class ZkMigrationIntegrationTest {
 
     val underlying = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying()
     val zkClient = underlying.zkClient
-    val migrationClient = new ZkMigrationClient(zkClient, PasswordEncoder.noop())
+    val migrationClient = ZkMigrationClient(zkClient, PasswordEncoder.noop())
     val verifier = new MetadataDeltaVerifier()
     migrationClient.readAllMetadata(batch => verifier.accept(batch), _ => { })
     verifier.verify { image =>
@@ -160,7 +160,7 @@ class ZkMigrationIntegrationTest {
       case None => PasswordEncoder.noop()
     }
 
-    val migrationClient = new ZkMigrationClient(zkClient, zkConfigEncoder)
+    val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder)
     var migrationState = migrationClient.getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY)
     migrationState = migrationState.withNewKRaftController(3000, 42)
     migrationState = migrationClient.claimControllerLeadership(migrationState)
@@ -315,10 +315,10 @@ class ZkMigrationIntegrationTest {
 
   def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
     TestUtils.retry(10000) {
-      assertEquals("1000.0", zkClient.getEntityConfigs(ConfigType.User, "user1").getProperty("consumer_byte_rate"))
-      assertEquals("800.0", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"))
-      assertEquals("100.0", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"))
-      assertEquals("10.0", zkClient.getEntityConfigs(ConfigType.Ip, "8.8.8.8").getProperty("connection_creation_rate"))
+      assertEquals("1000", zkClient.getEntityConfigs(ConfigType.User, "user1").getProperty("consumer_byte_rate"))
+      assertEquals("800", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"))
+      assertEquals("100", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"))
+      assertEquals("10", zkClient.getEntityConfigs(ConfigType.Ip, "8.8.8.8").getProperty("connection_creation_rate"))
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
deleted file mode 100644
index 8d65004b6e4..00000000000
--- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
+++ /dev/null
@@ -1,555 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.zk
-
-import kafka.api.LeaderAndIsr
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.coordinator.transaction.ProducerIdManager
-import kafka.security.authorizer.AclAuthorizer
-import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
-import kafka.server.{ConfigType, KafkaConfig, QuorumTestHarness, ZkAdminManager}
-import kafka.utils.{PasswordEncoder, TestUtils}
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
-import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.common.config.internals.QuotaConfigs
-import org.apache.kafka.common.config.types.Password
-import org.apache.kafka.common.errors.ControllerMovedException
-import org.apache.kafka.common.metadata.{AccessControlEntryRecord, ConfigRecord, MetadataRecordType, ProducerIdsRecord}
-import org.apache.kafka.common.quota.ClientQuotaEntity
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
-import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
-import org.apache.kafka.common.security.scram.ScramCredential
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{SecurityUtils, Time}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
-import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
-import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.util.MockRandom
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
-
-import java.util.{Properties, UUID}
-import scala.collection.{Map, mutable}
-import scala.collection.mutable.ArrayBuffer
-import scala.jdk.CollectionConverters._
-
-/**
- * ZooKeeper integration tests that verify the interoperability of KafkaZkClient and ZkMigrationClient.
- */
-class ZkMigrationClientTest extends QuorumTestHarness {
-
-  private val InitialControllerEpoch: Int = 42
-  private val InitialKRaftEpoch: Int = 0
-
-  private var migrationClient: ZkMigrationClient = _
-
-  private var migrationState: ZkMigrationLeadershipState = _
-
-  private val SECRET = "secret"
-
-  private val encoder: PasswordEncoder = {
-    val encoderProps = new Properties()
-    encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation
-    encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the
-    val encoderConfig = new KafkaConfig(encoderProps)
-    PasswordEncoder.encrypting(encoderConfig.passwordEncoderSecret.get,
-      encoderConfig.passwordEncoderKeyFactoryAlgorithm,
-      encoderConfig.passwordEncoderCipherAlgorithm,
-      encoderConfig.passwordEncoderKeyLength,
-      encoderConfig.passwordEncoderIterations)
-  }
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-    zkClient.createControllerEpochRaw(1)
-    migrationClient = new ZkMigrationClient(zkClient, encoder)
-    migrationState = initialMigrationState
-    migrationState = migrationClient.getOrCreateMigrationRecoveryState(migrationState)
-   }
-
-  private def initialMigrationState: ZkMigrationLeadershipState = {
-    val (epoch, stat) = zkClient.getControllerEpoch.get
-    new ZkMigrationLeadershipState(3000, InitialControllerEpoch, 100, InitialKRaftEpoch, Time.SYSTEM.milliseconds(), -1, epoch, stat.getVersion)
-  }
-
-  @Test
-  def testMigrateEmptyZk(): Unit = {
-    val brokers = new java.util.ArrayList[Integer]()
-    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
-
-    migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
-    assertEquals(0, brokers.size())
-    assertEquals(0, batches.size())
-  }
-
-  @Test
-  def testMigrationBrokerConfigs(): Unit = {
-    val brokers = new java.util.ArrayList[Integer]()
-    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
-
-    // Create some configs and persist in Zk.
-    val props = new Properties()
-    props.put(KafkaConfig.DefaultReplicationFactorProp, "1") // normal config
-    props.put(KafkaConfig.SslKeystorePasswordProp, encoder.encode(new Password(SECRET))) // sensitive config
-    zkClient.setOrCreateEntityConfigs(ConfigType.Broker, "1", props)
-
-    migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
-    assertEquals(0, brokers.size())
-    assertEquals(1, batches.size())
-    assertEquals(2, batches.get(0).size)
-
-    batches.get(0).forEach(record => {
-      val message = record.message().asInstanceOf[ConfigRecord]
-      val name = message.name
-      val value = message.value
-
-      assertTrue(props.containsKey(name))
-      // If the config is senstive, compare it to the decoded value.
-      if (name == KafkaConfig.SslKeystorePasswordProp) {
-        assertEquals(SECRET, value)
-      } else {
-        assertEquals(props.getProperty(name), value)
-      }
-    })
-  }
-
-  @Test
-  def testEmptyWrite(): Unit = {
-    val (zkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(), migrationState)
-    assertEquals(migrationState.migrationZkVersion(), zkVersion)
-    assertTrue(responses.isEmpty)
-  }
-
-  @Test
-  def testUpdateExistingPartitions(): Unit = {
-    // Create a topic and partition state in ZK like KafkaController would
-    val assignment = Map(
-      new TopicPartition("test", 0) -> List(0, 1, 2),
-      new TopicPartition("test", 1) -> List(1, 2, 3)
-    )
-    zkClient.createTopicAssignment("test", Some(Uuid.randomUuid()), assignment)
-
-    val leaderAndIsrs = Map(
-      new TopicPartition("test", 0) -> LeaderIsrAndControllerEpoch(
-        LeaderAndIsr(0, 5, List(0, 1, 2), LeaderRecoveryState.RECOVERED, -1), 1),
-      new TopicPartition("test", 1) -> LeaderIsrAndControllerEpoch(
-        LeaderAndIsr(1, 5, List(1, 2, 3), LeaderRecoveryState.RECOVERED, -1), 1)
-    )
-    zkClient.createTopicPartitionStatesRaw(leaderAndIsrs, 0)
-
-    // Now verify that we can update it with migration client
-    assertEquals(0, migrationState.migrationZkVersion())
-
-    val partitions = Map(
-      0 -> new PartitionRegistration(Array(0, 1, 2), Array(1, 2), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 6, -1),
-      1 -> new PartitionRegistration(Array(1, 2, 3), Array(3), Array(), Array(), 3, LeaderRecoveryState.RECOVERED, 7, -1)
-    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
-    migrationState = migrationClient.updateTopicPartitions(Map("test" -> partitions).asJava, migrationState)
-    assertEquals(1, migrationState.migrationZkVersion())
-
-    // Read back with Zk client
-    val partition0 = zkClient.getTopicPartitionState(new TopicPartition("test", 0)).get.leaderAndIsr
-    assertEquals(1, partition0.leader)
-    assertEquals(6, partition0.leaderEpoch)
-    assertEquals(List(1, 2), partition0.isr)
-
-    val partition1 = zkClient.getTopicPartitionState(new TopicPartition("test", 1)).get.leaderAndIsr
-    assertEquals(3, partition1.leader)
-    assertEquals(7, partition1.leaderEpoch)
-    assertEquals(List(3), partition1.isr)
-  }
-
-  @Test
-  def testCreateNewPartitions(): Unit = {
-    assertEquals(0, migrationState.migrationZkVersion())
-
-    val partitions = Map(
-      0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
-      1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
-    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
-    migrationState = migrationClient.createTopic("test", Uuid.randomUuid(), partitions, migrationState)
-    assertEquals(1, migrationState.migrationZkVersion())
-
-    // Read back with Zk client
-    val partition0 = zkClient.getTopicPartitionState(new TopicPartition("test", 0)).get.leaderAndIsr
-    assertEquals(0, partition0.leader)
-    assertEquals(0, partition0.leaderEpoch)
-    assertEquals(List(0, 1, 2), partition0.isr)
-
-    val partition1 = zkClient.getTopicPartitionState(new TopicPartition("test", 1)).get.leaderAndIsr
-    assertEquals(1, partition1.leader)
-    assertEquals(0, partition1.leaderEpoch)
-    assertEquals(List(1, 2, 3), partition1.isr)
-  }
-
-  @Test
-  def testIdempotentCreateTopics(): Unit = {
-    assertEquals(0, migrationState.migrationZkVersion())
-
-    val partitions = Map(
-      0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
-      1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
-    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
-    val topicId = Uuid.randomUuid()
-    migrationState = migrationClient.createTopic("test", topicId, partitions, migrationState)
-    assertEquals(1, migrationState.migrationZkVersion())
-
-    migrationState = migrationClient.createTopic("test", topicId, partitions, migrationState)
-    assertEquals(1, migrationState.migrationZkVersion())
-  }
-
-  // Write Client Quotas using ZkMigrationClient and read them back using AdminZkClient
-  private def writeClientQuotaAndVerify(migrationClient: ZkMigrationClient,
-                                        adminZkClient: AdminZkClient,
-                                        migrationState: ZkMigrationLeadershipState,
-                                        entity: Map[String, String],
-                                        quotas: Map[String, java.lang.Double],
-                                        scram: Map[String, String],
-                                        zkEntityType: String,
-                                        zkEntityName: String): ZkMigrationLeadershipState = {
-    val nextMigrationState = migrationClient.writeClientQuotas(
-      entity.asJava,
-      quotas.asJava,
-      scram.asJava,
-      migrationState)
-    val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
-      adminZkClient.fetchEntityConfig(zkEntityType, zkEntityName).asScala)
-    assertEquals(quotas, newProps)
-    nextMigrationState
-  }
-
-
-  @Test
-  def testWriteExistingClientQuotas(): Unit = {
-    val props = new Properties()
-    props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "100000")
-    adminZkClient.changeConfigs(ConfigType.User, "user1", props)
-    adminZkClient.changeConfigs(ConfigType.User, "user1/clients/clientA", props)
-
-    assertEquals(0, migrationState.migrationZkVersion())
-    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
-      Map(ClientQuotaEntity.USER -> "user1"),
-      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
-      Map.empty,
-      ConfigType.User, "user1")
-    assertEquals(1, migrationState.migrationZkVersion())
-
-    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
-      Map(ClientQuotaEntity.USER -> "user1"),
-      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
-      Map.empty,
-      ConfigType.User, "user1")
-    assertEquals(2, migrationState.migrationZkVersion())
-
-    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
-      Map(ClientQuotaEntity.USER -> "user1"),
-      Map.empty,
-      Map.empty,
-      ConfigType.User, "user1")
-    assertEquals(3, migrationState.migrationZkVersion())
-
-    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
-      Map(ClientQuotaEntity.USER -> "user1"),
-      Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
-      Map.empty,
-      ConfigType.User, "user1")
-    assertEquals(4, migrationState.migrationZkVersion())
-  }
-
-  @Test
-  def testWriteNewClientQuotas(): Unit = {
-    assertEquals(0, migrationState.migrationZkVersion())
-    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
-      Map(ClientQuotaEntity.USER -> "user2"),
-      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
-      Map.empty,
-      ConfigType.User, "user2")
-
-    assertEquals(1, migrationState.migrationZkVersion())
-
-    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
-      Map(ClientQuotaEntity.USER -> "user2", ClientQuotaEntity.CLIENT_ID -> "clientA"),
-      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
-      Map.empty,
-      ConfigType.User, "user2/clients/clientA")
-
-    assertEquals(2, migrationState.migrationZkVersion())
-  }
-
-  @Test
-  def testScram(): Unit = {
-    val random = new MockRandom()
-    def randomBuffer(random: MockRandom, length: Int): Array[Byte] = {
-        val buf = new Array[Byte](length)
-        random.nextBytes(buf)
-        buf
-    }
-    val scramCredential = new ScramCredential(
-        randomBuffer(random, 1024),
-        randomBuffer(random, 1024),
-        randomBuffer(random, 1024),
-        4096)
-
-    val props = new Properties()
-    props.put("SCRAM-SHA-256", ScramCredentialUtils.credentialToString(scramCredential))
-    adminZkClient.changeConfigs(ConfigType.User, "alice", props)
-
-    val brokers = new java.util.ArrayList[Integer]()
-    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
-
-    migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
-    assertEquals(0, brokers.size())
-    assertEquals(1, batches.size())
-    assertEquals(1, batches.get(0).size)
-  }
-
-  @Test
-  def testClaimAbsentController(): Unit = {
-    assertEquals(0, migrationState.migrationZkVersion())
-    migrationState = migrationClient.claimControllerLeadership(migrationState)
-    assertEquals(1, migrationState.zkControllerEpochZkVersion())
-  }
-
-  @Test
-  def testExistingKRaftControllerClaim(): Unit = {
-    assertEquals(0, migrationState.migrationZkVersion())
-    migrationState = migrationClient.claimControllerLeadership(migrationState)
-    assertEquals(1, migrationState.zkControllerEpochZkVersion())
-
-    // We don't require a KRaft controller to release the controller in ZK before another KRaft controller
-    // can claim it. This is because KRaft leadership comes from Raft and we are just synchronizing it to ZK.
-    var otherNodeState = ZkMigrationLeadershipState.EMPTY
-      .withNewKRaftController(3001, 43)
-      .withKRaftMetadataOffsetAndEpoch(100, 42);
-    otherNodeState = migrationClient.claimControllerLeadership(otherNodeState)
-    assertEquals(2, otherNodeState.zkControllerEpochZkVersion())
-    assertEquals(3001, otherNodeState.kraftControllerId())
-    assertEquals(43, otherNodeState.kraftControllerEpoch())
-  }
-
-  @Test
-  def testNonIncreasingKRaftEpoch(): Unit = {
-    assertEquals(0, migrationState.migrationZkVersion())
-
-    migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch)
-    migrationState = migrationClient.claimControllerLeadership(migrationState)
-    assertEquals(1, migrationState.zkControllerEpochZkVersion())
-
-    migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch - 1)
-    val t1 = assertThrows(classOf[ControllerMovedException], () => migrationClient.claimControllerLeadership(migrationState))
-    assertEquals("Cannot register KRaft controller 3001 with epoch 41 as the current controller register in ZK has the same or newer epoch 42.", t1.getMessage)
-
-    migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch)
-    val t2 = assertThrows(classOf[ControllerMovedException], () => migrationClient.claimControllerLeadership(migrationState))
-    assertEquals("Cannot register KRaft controller 3001 with epoch 42 as the current controller register in ZK has the same or newer epoch 42.", t2.getMessage)
-
-    migrationState = migrationState.withNewKRaftController(3001, 100)
-    migrationState = migrationClient.claimControllerLeadership(migrationState)
-    assertEquals(migrationState.kraftControllerEpoch(), 100)
-    assertEquals(migrationState.kraftControllerId(), 3001)
-  }
-
-  @Test
-  def testClaimAndReleaseExistingController(): Unit = {
-    assertEquals(0, migrationState.migrationZkVersion())
-
-    val (epoch, zkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(100)
-    assertEquals(epoch, 2)
-    assertEquals(zkVersion, 1)
-
-    migrationState = migrationClient.claimControllerLeadership(migrationState)
-    assertEquals(2, migrationState.zkControllerEpochZkVersion())
-    zkClient.getControllerEpoch match {
-      case Some((zkEpoch, stat)) =>
-        assertEquals(3, zkEpoch)
-        assertEquals(2, stat.getVersion)
-      case None => fail()
-    }
-    assertEquals(3000, zkClient.getControllerId.get)
-    assertThrows(classOf[ControllerMovedException], () => zkClient.registerControllerAndIncrementControllerEpoch(100))
-
-    migrationState = migrationClient.releaseControllerLeadership(migrationState)
-    val (epoch1, zkVersion1) = zkClient.registerControllerAndIncrementControllerEpoch(100)
-    assertEquals(epoch1, 4)
-    assertEquals(zkVersion1, 3)
-  }
-
-  @Test
-  def testReadAndWriteProducerId(): Unit = {
-    def generateNextProducerIdWithZkAndRead(): Long = {
-      // Generate a producer ID in ZK
-      val manager = ProducerIdManager.zk(1, zkClient)
-      manager.generateProducerId()
-
-      val records = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
-      migrationClient.migrateProducerId(batch => records.add(batch))
-      assertEquals(1, records.size())
-      assertEquals(1, records.get(0).size())
-
-      val record = records.get(0).get(0).message().asInstanceOf[ProducerIdsRecord]
-      record.nextProducerId()
-    }
-
-    // Initialize with ZK ProducerIdManager
-    assertEquals(0, generateNextProducerIdWithZkAndRead())
-
-    // Update next producer ID via migration client
-    migrationState = migrationClient.writeProducerId(6000, migrationState)
-    assertEquals(1, migrationState.migrationZkVersion())
-
-    // Switch back to ZK, it should provision the next block
-    assertEquals(7000, generateNextProducerIdWithZkAndRead())
-  }
-
-  @Test
-  def testMigrateTopicConfigs(): Unit = {
-    val props = new Properties()
-    props.put(TopicConfig.FLUSH_MS_CONFIG, "60000")
-    props.put(TopicConfig.RETENTION_MS_CONFIG, "300000")
-    adminZkClient.createTopicWithAssignment("test", props, Map(0 -> Seq(0, 1, 2), 1 -> Seq(1, 2, 0), 2 -> Seq(2, 0, 1)), usesTopicId = true)
-
-    val brokers = new java.util.ArrayList[Integer]()
-    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
-    migrationClient.migrateTopics(batch => batches.add(batch), brokerId => brokers.add(brokerId))
-    assertEquals(1, batches.size())
-    val configs = batches.get(0)
-      .asScala
-      .map {_.message()}
-      .filter(message => MetadataRecordType.fromId(message.apiKey()).equals(MetadataRecordType.CONFIG_RECORD))
-      .map {_.asInstanceOf[ConfigRecord]}
-      .toSeq
-    assertEquals(2, configs.size)
-    assertEquals(TopicConfig.FLUSH_MS_CONFIG, configs.head.name())
-    assertEquals("60000", configs.head.value())
-    assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name())
-    assertEquals("300000", configs.last.value())
-  }
-
-  @Test
-  def testWriteNewTopicConfigs(): Unit = {
-    migrationState = migrationClient.writeConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "test"),
-      java.util.Collections.singletonMap(TopicConfig.SEGMENT_MS_CONFIG, "100000"), migrationState)
-    assertEquals(1, migrationState.migrationZkVersion())
-
-    val newProps = zkClient.getEntityConfigs(ConfigType.Topic, "test")
-    assertEquals(1, newProps.size())
-    assertEquals("100000", newProps.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
-  }
-
-  @Test
-  def testWriteExistingTopicConfigs(): Unit = {
-    val props = new Properties()
-    props.put(TopicConfig.FLUSH_MS_CONFIG, "60000")
-    props.put(TopicConfig.RETENTION_MS_CONFIG, "300000")
-    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, "test", props)
-
-    migrationState = migrationClient.writeConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "test"),
-      java.util.Collections.singletonMap(TopicConfig.SEGMENT_MS_CONFIG, "100000"), migrationState)
-    assertEquals(1, migrationState.migrationZkVersion())
-
-    val newProps = zkClient.getEntityConfigs(ConfigType.Topic, "test")
-    assertEquals(1, newProps.size())
-    assertEquals("100000", newProps.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
-  }
-
-  def migrateAclsAndVerify(authorizer: AclAuthorizer, acls: Seq[AclBinding]): Unit = {
-    authorizer.createAcls(null, acls.asJava)
-    val batches = new ArrayBuffer[mutable.Buffer[ApiMessageAndVersion]]()
-    migrationClient.migrateAcls(batch => batches.append(batch.asScala))
-    val records = batches.flatten.map(_.message().asInstanceOf[AccessControlEntryRecord])
-    assertEquals(acls.size, records.size, "Expected one record for each ACLBinding")
-  }
-
-  def writeAclAndReadWithAuthorizer(
-    authorizer: AclAuthorizer,
-    resourcePattern: ResourcePattern,
-    ace: AccessControlEntry,
-    pred: Seq[AclBinding] => Boolean
-  ): Seq[AclBinding] = {
-    val resourceFilter = new AclBindingFilter(
-      new ResourcePatternFilter(resourcePattern.resourceType(), resourcePattern.name(), resourcePattern.patternType()),
-      AclBindingFilter.ANY.entryFilter()
-    )
-    migrationState = migrationClient.writeAddedAcls(resourcePattern, List(ace).asJava, migrationState)
-    val (acls, ok) = TestUtils.computeUntilTrue(authorizer.acls(resourceFilter).asScala.toSeq)(pred)
-    assertTrue(ok)
-    acls
-  }
-
-  def deleteAclAndReadWithAuthorizer(
-    authorizer: AclAuthorizer,
-    resourcePattern: ResourcePattern,
-    ace: AccessControlEntry,
-    pred: Seq[AclBinding] => Boolean
-  ): Seq[AclBinding] = {
-    val resourceFilter = new AclBindingFilter(
-      new ResourcePatternFilter(resourcePattern.resourceType(), resourcePattern.name(), resourcePattern.patternType()),
-      AclBindingFilter.ANY.entryFilter()
-    )
-    migrationState = migrationClient.removeDeletedAcls(resourcePattern, List(ace).asJava, migrationState)
-    val (acls, ok) = TestUtils.computeUntilTrue(authorizer.acls(resourceFilter).asScala.toSeq)(pred)
-    assertTrue(ok)
-    acls
-  }
-
-  @Test
-  def testAclsMigrateAndDualWrite(): Unit = {
-    val resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + UUID.randomUUID(), PatternType.LITERAL)
-    val resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + UUID.randomUUID(), PatternType.LITERAL)
-    val prefixedResource = new ResourcePattern(ResourceType.TOPIC, "bar-", PatternType.PREFIXED)
-    val username = "alice"
-    val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
-    val wildcardPrincipal = SecurityUtils.parseKafkaPrincipal(WildcardPrincipalString)
-
-    val ace1 = new AccessControlEntry(principal.toString, WildcardHost, AclOperation.READ, AclPermissionType.ALLOW)
-    val acl1 = new AclBinding(resource1, ace1)
-    val ace2 = new AccessControlEntry(principal.toString, "192.168.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)
-    val acl2 = new AclBinding(resource1, ace2)
-    val acl3 = new AclBinding(resource2, new AccessControlEntry(principal.toString, WildcardHost, AclOperation.DESCRIBE, AclPermissionType.ALLOW))
-    val acl4 = new AclBinding(prefixedResource, new AccessControlEntry(wildcardPrincipal.toString, WildcardHost, AclOperation.READ, AclPermissionType.ALLOW))
-
-    val authorizer = new AclAuthorizer()
-    try {
-      authorizer.configure(Map("zookeeper.connect" -> this.zkConnect).asJava)
-
-      // Migrate ACLs
-      migrateAclsAndVerify(authorizer, Seq(acl1, acl2, acl3, acl4))
-
-      // Delete one of resource1's ACLs
-      var resource1Acls = deleteAclAndReadWithAuthorizer(authorizer, resource1, ace2, acls => acls.size == 1)
-      assertEquals(acl1, resource1Acls.head)
-
-      // Delete the other ACL from resource1
-      deleteAclAndReadWithAuthorizer(authorizer, resource1, ace1, acls => acls.isEmpty)
-
-      // Add a new ACL for resource1
-      val newAce1 = new AccessControlEntry(principal.toString, "10.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)
-      resource1Acls = writeAclAndReadWithAuthorizer(authorizer, resource1, newAce1, acls => acls.size == 1)
-      assertEquals(newAce1, resource1Acls.head.entry())
-
-      // Add a new ACL for resource2
-      val newAce2 = new AccessControlEntry(principal.toString, "10.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)
-      val resource2Acls = writeAclAndReadWithAuthorizer(authorizer, resource2, newAce2, acls => acls.size == 2)
-      assertEquals(acl3, resource2Acls.head)
-      assertEquals(newAce2, resource2Acls.last.entry())
-    } finally {
-      authorizer.close()
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
new file mode 100644
index 00000000000..6df29f651a2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkAclMigrationClientTest.scala
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk.migration
+
+import kafka.security.authorizer.AclAuthorizer
+import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.acl._
+import org.apache.kafka.common.metadata.AccessControlEntryRecord
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.SecurityUtils
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+import java.util.UUID
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+class ZkAclMigrationClientTest extends ZkMigrationTestHarness {
+  def migrateAclsAndVerify(authorizer: AclAuthorizer, acls: Seq[AclBinding]): Unit = {
+    authorizer.createAcls(null, acls.asJava)
+    val batches = new mutable.ArrayBuffer[mutable.Buffer[ApiMessageAndVersion]]()
+    migrationClient.migrateAcls(batch => batches.append(batch.asScala))
+    val records = batches.flatten.map(_.message().asInstanceOf[AccessControlEntryRecord])
+    assertEquals(acls.size, records.size, "Expected one record for each ACLBinding")
+  }
+
+  def replaceAclsAndReadWithAuthorizer(
+    authorizer: AclAuthorizer,
+    resourcePattern: ResourcePattern,
+    aces: Seq[AccessControlEntry],
+    pred: Seq[AclBinding] => Boolean
+  ): Seq[AclBinding] = {
+    val resourceFilter = new AclBindingFilter(
+      new ResourcePatternFilter(resourcePattern.resourceType(), resourcePattern.name(), resourcePattern.patternType()),
+      AclBindingFilter.ANY.entryFilter()
+    )
+    migrationState = migrationClient.aclClient().writeResourceAcls(resourcePattern, aces.asJava, migrationState)
+    val (acls, ok) = TestUtils.computeUntilTrue(authorizer.acls(resourceFilter).asScala.toSeq)(pred)
+    assertTrue(ok)
+    acls
+  }
+
+  def deleteResourceAndReadWithAuthorizer(
+    authorizer: AclAuthorizer,
+    resourcePattern: ResourcePattern
+  ): Unit = {
+    val resourceFilter = new AclBindingFilter(
+      new ResourcePatternFilter(resourcePattern.resourceType(), resourcePattern.name(), resourcePattern.patternType()),
+      AclBindingFilter.ANY.entryFilter()
+    )
+    migrationState = migrationClient.aclClient().deleteResource(resourcePattern, migrationState)
+    val (_, ok) = TestUtils.computeUntilTrue(authorizer.acls(resourceFilter).asScala.toSeq)(_.isEmpty)
+    assertTrue(ok)
+  }
+
+
+  @Test
+  def testAclsMigrateAndDualWrite(): Unit = {
+    val resource1 = new ResourcePattern(ResourceType.TOPIC, "foo-" + UUID.randomUUID(), PatternType.LITERAL)
+    val resource2 = new ResourcePattern(ResourceType.TOPIC, "bar-" + UUID.randomUUID(), PatternType.LITERAL)
+    val prefixedResource = new ResourcePattern(ResourceType.TOPIC, "bar-", PatternType.PREFIXED)
+    val username = "alice"
+    val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+    val wildcardPrincipal = SecurityUtils.parseKafkaPrincipal(WildcardPrincipalString)
+
+    val ace1 = new AccessControlEntry(principal.toString, WildcardHost, AclOperation.READ, AclPermissionType.ALLOW)
+    val acl1 = new AclBinding(resource1, ace1)
+    val ace2 = new AccessControlEntry(principal.toString, "192.168.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)
+    val acl2 = new AclBinding(resource1, ace2)
+    val acl3 = new AclBinding(resource2, new AccessControlEntry(principal.toString, WildcardHost, AclOperation.DESCRIBE, AclPermissionType.ALLOW))
+    val acl4 = new AclBinding(prefixedResource, new AccessControlEntry(wildcardPrincipal.toString, WildcardHost, AclOperation.READ, AclPermissionType.ALLOW))
+
+    val authorizer = new AclAuthorizer()
+    try {
+      authorizer.configure(Map("zookeeper.connect" -> this.zkConnect).asJava)
+
+      // Migrate ACLs
+      migrateAclsAndVerify(authorizer, Seq(acl1, acl2, acl3, acl4))
+
+      // Remove one of resource1's ACLs
+      var resource1Acls = replaceAclsAndReadWithAuthorizer(authorizer, resource1, Seq(ace1), acls => acls.size == 1)
+      assertEquals(acl1, resource1Acls.head)
+
+      // Delete the other ACL from resource1
+      deleteResourceAndReadWithAuthorizer(authorizer, resource1)
+
+      // Add a new ACL for resource1
+      val newAce1 = new AccessControlEntry(principal.toString, "10.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)
+      resource1Acls = replaceAclsAndReadWithAuthorizer(authorizer, resource1, Seq(newAce1), acls => acls.size == 1)
+      assertEquals(newAce1, resource1Acls.head.entry())
+
+      // Add a new ACL for resource2
+      val newAce2 = new AccessControlEntry(principal.toString, "10.0.0.1", AclOperation.WRITE, AclPermissionType.ALLOW)
+      val resource2Acls = replaceAclsAndReadWithAuthorizer(authorizer, resource2, Seq(acl3.entry(), newAce2), acls => acls.size == 2)
+      assertEquals(acl3, resource2Acls.head)
+      assertEquals(newAce2, resource2Acls.last.entry())
+    } finally {
+      authorizer.close()
+    }
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
new file mode 100644
index 00000000000..7313c321fe3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk.migration
+
+import kafka.server.{ConfigType, KafkaConfig, ZkAdminManager}
+import kafka.zk.{AdminZkClient, ZkMigrationClient}
+import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.metadata.ConfigRecord
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.security.scram.ScramCredential
+import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
+import org.apache.kafka.image.{ClientQuotasDelta, ClientQuotasImage}
+import org.apache.kafka.metadata.RecordTestUtils
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.util.MockRandom
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+import java.util.Properties
+import scala.collection.Map
+import scala.jdk.CollectionConverters._
+
+class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
+  @Test
+  def testMigrationBrokerConfigs(): Unit = {
+    val brokers = new java.util.ArrayList[Integer]()
+    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+
+    // Create some configs and persist in Zk.
+    val props = new Properties()
+    props.put(KafkaConfig.DefaultReplicationFactorProp, "1") // normal config
+    props.put(KafkaConfig.SslKeystorePasswordProp, encoder.encode(new Password(SECRET))) // sensitive config
+    zkClient.setOrCreateEntityConfigs(ConfigType.Broker, "1", props)
+
+    migrationClient.migrateBrokerConfigs(batch => batches.add(batch), brokerId => brokers.add(brokerId))
+    assertEquals(1, brokers.size())
+    assertEquals(1, batches.size())
+    assertEquals(2, batches.get(0).size)
+
+    batches.get(0).forEach(record => {
+      val message = record.message().asInstanceOf[ConfigRecord]
+      val name = message.name
+      val value = message.value
+
+      assertTrue(props.containsKey(name))
+      // If the config is senstive, compare it to the decoded value.
+      if (name == KafkaConfig.SslKeystorePasswordProp) {
+        assertEquals(SECRET, value)
+      } else {
+        assertEquals(props.getProperty(name), value)
+      }
+    })
+
+    migrationState = migrationClient.configClient().deleteConfigs(
+      new ConfigResource(ConfigResource.Type.BROKER, "1"), migrationState)
+    assertEquals(0, zkClient.getEntityConfigs(ConfigType.Broker, "1").size())
+  }
+
+  @Test
+  def testMigrateClientQuotas(): Unit = {
+    val props = new Properties()
+    props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "100000")
+    adminZkClient.changeConfigs(ConfigType.User, "<default>", props)
+    adminZkClient.changeConfigs(ConfigType.User, "user1", props)
+    adminZkClient.changeConfigs(ConfigType.User, "user1/clients/clientA", props)
+    adminZkClient.changeConfigs(ConfigType.User, "<default>/clients/<default>", props)
+    adminZkClient.changeConfigs(ConfigType.User, "<default>/clients/clientA", props)
+    adminZkClient.changeConfigs(ConfigType.Client, "<default>", props)
+    adminZkClient.changeConfigs(ConfigType.Client, "clientB", props)
+    props.remove(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG)
+    props.put(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, "10")
+    adminZkClient.changeConfigs(ConfigType.Ip, "1.1.1.1", props)
+    adminZkClient.changeConfigs(ConfigType.Ip, "<default>", props)
+
+    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+    migrationClient.migrateClientQuotas(batch => batches.add(batch))
+
+    assertEquals(9, batches.size())
+    val delta = new ClientQuotasDelta(ClientQuotasImage.EMPTY)
+    RecordTestUtils.replayAllBatches(delta, batches)
+    val image = delta.apply()
+
+    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "").asJava)))
+    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1").asJava)))
+    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava)))
+    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "").asJava)))
+    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "clientA").asJava)))
+    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "").asJava)))
+    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "clientB").asJava)))
+    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "1.1.1.1").asJava)))
+    assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "").asJava)))
+  }
+
+  @Test
+  def testWriteExistingClientQuotas(): Unit = {
+    val props = new Properties()
+    props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "100000")
+    adminZkClient.changeConfigs(ConfigType.User, "user1", props)
+    adminZkClient.changeConfigs(ConfigType.User, "user1/clients/clientA", props)
+
+    assertEquals(0, migrationState.migrationZkVersion())
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ClientQuotaEntity.USER -> "user1"),
+      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
+      ConfigType.User, "user1")
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ClientQuotaEntity.USER -> "user1"),
+      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
+      ConfigType.User, "user1")
+    assertEquals(2, migrationState.migrationZkVersion())
+
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ClientQuotaEntity.USER -> "user1"),
+      Map.empty,
+      ConfigType.User, "user1")
+    assertEquals(3, migrationState.migrationZkVersion())
+
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ClientQuotaEntity.USER -> "user1"),
+      Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
+      ConfigType.User, "user1")
+    assertEquals(4, migrationState.migrationZkVersion())
+
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ClientQuotaEntity.USER -> ""),
+      Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
+      ConfigType.User, "<default>")
+    assertEquals(5, migrationState.migrationZkVersion())
+  }
+
+  // Write Client Quotas using ZkMigrationClient and read them back using AdminZkClient
+  private def writeClientQuotaAndVerify(
+    migrationClient: ZkMigrationClient,
+    adminZkClient: AdminZkClient,
+    migrationState: ZkMigrationLeadershipState,
+    entity: Map[String, String],
+    quotas: Map[String, java.lang.Double],
+    zkEntityType: String,
+    zkEntityName: String
+  ): ZkMigrationLeadershipState = {
+    val nextMigrationState = migrationClient.configClient().writeClientQuotas(
+      entity.asJava,
+      quotas.asJava,
+      Map.empty[String, String].asJava,
+      migrationState)
+    val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
+      adminZkClient.fetchEntityConfig(zkEntityType, zkEntityName).asScala)
+    assertEquals(quotas, newProps)
+    nextMigrationState
+  }
+
+  @Test
+  def testWriteNewClientQuotas(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ClientQuotaEntity.USER -> "user2"),
+      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
+      ConfigType.User, "user2")
+
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ClientQuotaEntity.USER -> "user2", ClientQuotaEntity.CLIENT_ID -> "clientA"),
+      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
+      ConfigType.User, "user2/clients/clientA")
+
+    assertEquals(2, migrationState.migrationZkVersion())
+  }
+
+  @Test
+  def testWriteNewTopicConfigs(): Unit = {
+    migrationState = migrationClient.configClient().writeConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "test"),
+      java.util.Collections.singletonMap(TopicConfig.SEGMENT_MS_CONFIG, "100000"), migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    val newProps = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+    assertEquals(1, newProps.size())
+    assertEquals("100000", newProps.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
+  }
+
+  @Test
+  def testWriteExistingTopicConfigs(): Unit = {
+    val props = new Properties()
+    props.put(TopicConfig.FLUSH_MS_CONFIG, "60000")
+    props.put(TopicConfig.RETENTION_MS_CONFIG, "300000")
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, "test", props)
+
+    migrationState = migrationClient.configClient().writeConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "test"),
+      java.util.Collections.singletonMap(TopicConfig.SEGMENT_MS_CONFIG, "100000"), migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    val newProps = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+    assertEquals(1, newProps.size())
+    assertEquals("100000", newProps.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
+  }
+
+  @Test
+  def testScram(): Unit = {
+    val random = new MockRandom()
+
+    def randomBuffer(random: MockRandom, length: Int): Array[Byte] = {
+      val buf = new Array[Byte](length)
+      random.nextBytes(buf)
+      buf
+    }
+
+    val scramCredential = new ScramCredential(
+      randomBuffer(random, 1024),
+      randomBuffer(random, 1024),
+      randomBuffer(random, 1024),
+      4096)
+
+    val props = new Properties()
+    props.put("SCRAM-SHA-256", ScramCredentialUtils.credentialToString(scramCredential))
+    adminZkClient.changeConfigs(ConfigType.User, "alice", props)
+
+    val brokers = new java.util.ArrayList[Integer]()
+    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+
+    migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
+    assertEquals(0, brokers.size())
+    assertEquals(1, batches.size())
+    assertEquals(1, batches.get(0).size)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
new file mode 100644
index 00000000000..7d7dae9f893
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.coordinator.transaction.ProducerIdManager
+import kafka.zk.migration.ZkMigrationTestHarness
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, ProducerIdsRecord}
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
+import org.junit.jupiter.api.Test
+
+import java.util.Properties
+import scala.collection.Map
+import scala.jdk.CollectionConverters._
+
+/**
+ * ZooKeeper integration tests that verify the interoperability of KafkaZkClient and ZkMigrationClient.
+ */
+class ZkMigrationClientTest extends ZkMigrationTestHarness {
+
+  @Test
+  def testMigrateEmptyZk(): Unit = {
+    val brokers = new java.util.ArrayList[Integer]()
+    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+
+    migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
+    assertEquals(0, brokers.size())
+    assertEquals(0, batches.size())
+  }
+
+  @Test
+  def testEmptyWrite(): Unit = {
+    val (zkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(), migrationState)
+    assertEquals(migrationState.migrationZkVersion(), zkVersion)
+    assertTrue(responses.isEmpty)
+  }
+
+  @Test
+  def testUpdateExistingPartitions(): Unit = {
+    // Create a topic and partition state in ZK like KafkaController would
+    val assignment = Map(
+      new TopicPartition("test", 0) -> List(0, 1, 2),
+      new TopicPartition("test", 1) -> List(1, 2, 3)
+    )
+    zkClient.createTopicAssignment("test", Some(Uuid.randomUuid()), assignment)
+
+    val leaderAndIsrs = Map(
+      new TopicPartition("test", 0) -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(0, 5, List(0, 1, 2), LeaderRecoveryState.RECOVERED, -1), 1),
+      new TopicPartition("test", 1) -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(1, 5, List(1, 2, 3), LeaderRecoveryState.RECOVERED, -1), 1)
+    )
+    zkClient.createTopicPartitionStatesRaw(leaderAndIsrs, 0)
+
+    // Now verify that we can update it with migration client
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    val partitions = Map(
+      0 -> new PartitionRegistration(Array(0, 1, 2), Array(1, 2), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 6, -1),
+      1 -> new PartitionRegistration(Array(1, 2, 3), Array(3), Array(), Array(), 3, LeaderRecoveryState.RECOVERED, 7, -1)
+    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+    migrationState = migrationClient.topicClient().updateTopicPartitions(Map("test" -> partitions).asJava, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    // Read back with Zk client
+    val partition0 = zkClient.getTopicPartitionState(new TopicPartition("test", 0)).get.leaderAndIsr
+    assertEquals(1, partition0.leader)
+    assertEquals(6, partition0.leaderEpoch)
+    assertEquals(List(1, 2), partition0.isr)
+
+    val partition1 = zkClient.getTopicPartitionState(new TopicPartition("test", 1)).get.leaderAndIsr
+    assertEquals(3, partition1.leader)
+    assertEquals(7, partition1.leaderEpoch)
+    assertEquals(List(3), partition1.isr)
+
+    // Delete whole topic
+    migrationState = migrationClient.topicClient().deleteTopic("test", migrationState)
+    assertEquals(2, migrationState.migrationZkVersion())
+  }
+
+  @Test
+  def testCreateNewPartitions(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    val partitions = Map(
+      0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
+      1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+    migrationState = migrationClient.topicClient().createTopic("test", Uuid.randomUuid(), partitions, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    // Read back with Zk client
+    val partition0 = zkClient.getTopicPartitionState(new TopicPartition("test", 0)).get.leaderAndIsr
+    assertEquals(0, partition0.leader)
+    assertEquals(0, partition0.leaderEpoch)
+    assertEquals(List(0, 1, 2), partition0.isr)
+
+    val partition1 = zkClient.getTopicPartitionState(new TopicPartition("test", 1)).get.leaderAndIsr
+    assertEquals(1, partition1.leader)
+    assertEquals(0, partition1.leaderEpoch)
+    assertEquals(List(1, 2, 3), partition1.isr)
+  }
+
+  @Test
+  def testIdempotentCreateTopics(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    val partitions = Map(
+      0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
+      1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+    val topicId = Uuid.randomUuid()
+    migrationState = migrationClient.topicClient().createTopic("test", topicId, partitions, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    migrationState = migrationClient.topicClient().createTopic("test", topicId, partitions, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+  }
+
+  @Test
+  def testClaimAbsentController(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(1, migrationState.zkControllerEpochZkVersion())
+  }
+
+  @Test
+  def testExistingKRaftControllerClaim(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(1, migrationState.zkControllerEpochZkVersion())
+
+    // We don't require a KRaft controller to release the controller in ZK before another KRaft controller
+    // can claim it. This is because KRaft leadership comes from Raft and we are just synchronizing it to ZK.
+    var otherNodeState = ZkMigrationLeadershipState.EMPTY
+      .withNewKRaftController(3001, 43)
+      .withKRaftMetadataOffsetAndEpoch(100, 42);
+    otherNodeState = migrationClient.claimControllerLeadership(otherNodeState)
+    assertEquals(2, otherNodeState.zkControllerEpochZkVersion())
+    assertEquals(3001, otherNodeState.kraftControllerId())
+    assertEquals(43, otherNodeState.kraftControllerEpoch())
+  }
+
+  @Test
+  def testNonIncreasingKRaftEpoch(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch)
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(1, migrationState.zkControllerEpochZkVersion())
+
+    migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch - 1)
+    val t1 = assertThrows(classOf[ControllerMovedException], () => migrationClient.claimControllerLeadership(migrationState))
+    assertEquals("Cannot register KRaft controller 3001 with epoch 41 as the current controller register in ZK has the same or newer epoch 42.", t1.getMessage)
+
+    migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch)
+    val t2 = assertThrows(classOf[ControllerMovedException], () => migrationClient.claimControllerLeadership(migrationState))
+    assertEquals("Cannot register KRaft controller 3001 with epoch 42 as the current controller register in ZK has the same or newer epoch 42.", t2.getMessage)
+
+    migrationState = migrationState.withNewKRaftController(3001, 100)
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(migrationState.kraftControllerEpoch(), 100)
+    assertEquals(migrationState.kraftControllerId(), 3001)
+  }
+
+  @Test
+  def testClaimAndReleaseExistingController(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    val (epoch, zkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(100)
+    assertEquals(epoch, 2)
+    assertEquals(zkVersion, 1)
+
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(2, migrationState.zkControllerEpochZkVersion())
+    zkClient.getControllerEpoch match {
+      case Some((zkEpoch, stat)) =>
+        assertEquals(3, zkEpoch)
+        assertEquals(2, stat.getVersion)
+      case None => fail()
+    }
+    assertEquals(3000, zkClient.getControllerId.get)
+    assertThrows(classOf[ControllerMovedException], () => zkClient.registerControllerAndIncrementControllerEpoch(100))
+
+    migrationState = migrationClient.releaseControllerLeadership(migrationState)
+    val (epoch1, zkVersion1) = zkClient.registerControllerAndIncrementControllerEpoch(100)
+    assertEquals(epoch1, 4)
+    assertEquals(zkVersion1, 3)
+  }
+
+  @Test
+  def testReadAndWriteProducerId(): Unit = {
+    def generateNextProducerIdWithZkAndRead(): Long = {
+      // Generate a producer ID in ZK
+      val manager = ProducerIdManager.zk(1, zkClient)
+      manager.generateProducerId()
+
+      val records = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+      migrationClient.migrateProducerId(batch => records.add(batch))
+      assertEquals(1, records.size())
+      assertEquals(1, records.get(0).size())
+
+      val record = records.get(0).get(0).message().asInstanceOf[ProducerIdsRecord]
+      record.nextProducerId()
+    }
+
+    // Initialize with ZK ProducerIdManager
+    assertEquals(0, generateNextProducerIdWithZkAndRead())
+
+    // Update next producer ID via migration client
+    migrationState = migrationClient.writeProducerId(6000, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    // Switch back to ZK, it should provision the next block
+    assertEquals(7000, generateNextProducerIdWithZkAndRead())
+  }
+
+  @Test
+  def testMigrateTopicConfigs(): Unit = {
+    val props = new Properties()
+    props.put(TopicConfig.FLUSH_MS_CONFIG, "60000")
+    props.put(TopicConfig.RETENTION_MS_CONFIG, "300000")
+    adminZkClient.createTopicWithAssignment("test", props, Map(0 -> Seq(0, 1, 2), 1 -> Seq(1, 2, 0), 2 -> Seq(2, 0, 1)), usesTopicId = true)
+
+    val brokers = new java.util.ArrayList[Integer]()
+    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+    migrationClient.migrateTopics(batch => batches.add(batch), brokerId => brokers.add(brokerId))
+    assertEquals(1, batches.size())
+    val configs = batches.get(0)
+      .asScala
+      .map {_.message() }
+      .filter(message => MetadataRecordType.fromId(message.apiKey()).equals(MetadataRecordType.CONFIG_RECORD))
+      .map { _.asInstanceOf[ConfigRecord] }
+      .toSeq
+    assertEquals(2, configs.size)
+    assertEquals(TopicConfig.FLUSH_MS_CONFIG, configs.head.name())
+    assertEquals("60000", configs.head.value())
+    assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name())
+    assertEquals("300000", configs.last.value())
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
new file mode 100644
index 00000000000..321903ed9ce
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk.migration
+
+import kafka.server.{KafkaConfig, QuorumTestHarness}
+import kafka.utils.PasswordEncoder
+import kafka.zk.ZkMigrationClient
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
+
+import java.util.Properties
+
+class ZkMigrationTestHarness extends QuorumTestHarness {
+  val InitialControllerEpoch: Int = 42
+
+  val InitialKRaftEpoch: Int = 0
+
+  var migrationClient: ZkMigrationClient = _
+
+  var migrationState: ZkMigrationLeadershipState = _
+
+  val SECRET = "secret"
+
+  val encoder: PasswordEncoder = {
+    val encoderProps = new Properties()
+    encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation
+    encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the
+    val encoderConfig = new KafkaConfig(encoderProps)
+    PasswordEncoder.encrypting(encoderConfig.passwordEncoderSecret.get,
+      encoderConfig.passwordEncoderKeyFactoryAlgorithm,
+      encoderConfig.passwordEncoderCipherAlgorithm,
+      encoderConfig.passwordEncoderKeyLength,
+      encoderConfig.passwordEncoderIterations)
+  }
+
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
+    zkClient.createControllerEpochRaw(1)
+    migrationClient = ZkMigrationClient(zkClient, encoder)
+    migrationState = initialMigrationState
+    migrationState = migrationClient.getOrCreateMigrationRecoveryState(migrationState)
+  }
+
+  private def initialMigrationState: ZkMigrationLeadershipState = {
+    val (epoch, stat) = zkClient.getControllerEpoch.get
+    new ZkMigrationLeadershipState(3000, InitialControllerEpoch, 100, InitialKRaftEpoch, Time.SYSTEM.milliseconds(), -1, epoch, stat.getVersion)
+  }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
index a255e3ad8f1..cf2bb75ddbf 100644
--- a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
@@ -25,10 +25,12 @@ import org.apache.kafka.metadata.authorizer.StandardAclWithId;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 
@@ -38,6 +40,7 @@ import java.util.stream.Collectors;
 public final class AclsDelta {
     private final AclsImage image;
     private final Map<Uuid, Optional<StandardAcl>> changes = new LinkedHashMap<>();
+    private final Set<StandardAcl> deleted = new HashSet<>();
     private boolean isSnapshotDelta = false;
 
     public AclsDelta(AclsImage image) {
@@ -54,6 +57,15 @@ public final class AclsDelta {
         return changes;
     }
 
+    /**
+     * Return a Set of the ACLs which were deleted in this delta. This is used by the ZK migration components.
+     *
+     * @return Set of deleted ACLs
+     */
+    public Set<StandardAcl> deleted() {
+        return deleted;
+    }
+
     void finishSnapshot() {
         this.isSnapshotDelta = true;
     }
@@ -82,8 +94,10 @@ public final class AclsDelta {
     public void replay(RemoveAccessControlEntryRecord record) {
         if (image.acls().containsKey(record.id())) {
             changes.put(record.id(), Optional.empty());
+            deleted.add(image.acls().get(record.id()));
         } else if (changes.containsKey(record.id())) {
             changes.remove(record.id());
+            // No need to track a ACL that was added and deleted within the same delta
         } else {
             throw new IllegalStateException("Failed to find existing ACL with ID " + record.id() + " in either image or changes");
         }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
index 101594615fb..6762391619b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
@@ -48,7 +48,7 @@ public final class ConfigurationsImage {
         return data.isEmpty();
     }
 
-    Map<ConfigResource, ConfigurationImage> resourceData() {
+    public Map<ConfigResource, ConfigurationImage> resourceData() {
         return data;
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/AclMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/AclMigrationClient.java
new file mode 100644
index 00000000000..0b2b368bc2d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/AclMigrationClient.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.resource.ResourcePattern;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+public interface AclMigrationClient {
+    ZkMigrationLeadershipState deleteResource(
+        ResourcePattern resourcePattern,
+        ZkMigrationLeadershipState state
+    );
+
+    ZkMigrationLeadershipState writeResourceAcls(
+        ResourcePattern resourcePattern,
+        Collection<AccessControlEntry> aclsToWrite,
+        ZkMigrationLeadershipState state
+    );
+
+    void iterateAcls(BiConsumer<ResourcePattern, Set<AccessControlEntry>> aclConsumer);
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java
new file mode 100644
index 00000000000..9a7b486ff11
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ConfigMigrationClient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.security.scram.ScramCredential;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+public interface ConfigMigrationClient {
+
+    interface ClientQuotaVisitor {
+        void visitClientQuota(List<ClientQuotaRecord.EntityData> entityDataList, Map<String, Double> quotas);
+
+        void visitScramCredential(String userName, ScramMechanism scramMechanism, ScramCredential scramCredential);
+    }
+
+    void iterateClientQuotas(ClientQuotaVisitor visitor);
+
+    void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer);
+
+    ZkMigrationLeadershipState writeConfigs(
+        ConfigResource configResource,
+        Map<String, String> configMap,
+        ZkMigrationLeadershipState state
+    );
+
+    ZkMigrationLeadershipState writeClientQuotas(
+        Map<String, String> clientQuotaEntity,
+        Map<String, Double> quotas,
+        Map<String, String> scram,
+        ZkMigrationLeadershipState state
+    );
+
+    ZkMigrationLeadershipState deleteConfigs(
+        ConfigResource configResource,
+        ZkMigrationLeadershipState state
+    );
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 5a02bbf6c0d..caa030abbce 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -16,14 +16,10 @@
  */
 package org.apache.kafka.metadata.migration;
 
-import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.MetadataRecordType;
-import org.apache.kafka.common.quota.ClientQuotaEntity;
-import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
 import org.apache.kafka.controller.QuorumFeatures;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -32,8 +28,6 @@ import org.apache.kafka.image.loader.LoaderManifest;
 import org.apache.kafka.image.loader.LoaderManifestType;
 import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.metadata.BrokerRegistration;
-import org.apache.kafka.metadata.authorizer.StandardAcl;
-import org.apache.kafka.metadata.ScramCredentialData;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.raft.LeaderAndEpoch;
@@ -44,13 +38,9 @@ import org.apache.kafka.server.util.Deadline;
 import org.apache.kafka.server.util.FutureUtils;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -58,7 +48,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -83,6 +72,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     private final Logger log;
     private final int nodeId;
     private final MigrationClient zkMigrationClient;
+    private final KRaftMigrationZkWriter zkMetadataWriter;
     private final LegacyPropagator propagator;
     private final ZkRecordConsumer zkRecordConsumer;
     private final KafkaEventQueue eventQueue;
@@ -125,6 +115,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         this.initialZkLoadHandler = initialZkLoadHandler;
         this.faultHandler = faultHandler;
         this.quorumFeatures = quorumFeatures;
+        this.zkMetadataWriter = new KRaftMigrationZkWriter(zkMigrationClient, this::applyMigrationOperation);
     }
 
     public KRaftMigrationDriver(
@@ -159,7 +150,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
 
     private void recoverMigrationStateFromZK() {
         log.info("Recovering migration state from ZK");
-        apply("Recovery", zkMigrationClient::getOrCreateMigrationRecoveryState);
+        applyMigrationOperation("Recovery", zkMigrationClient::getOrCreateMigrationRecoveryState);
         String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
         log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);
 
@@ -215,9 +206,14 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         }
 
         // Once all of those are found, check the topic assignments. This is much more expensive than listing /brokers
-        Set<Integer> zkBrokersWithAssignments = zkMigrationClient.readBrokerIdsFromTopicAssignments();
+        Set<Integer> zkBrokersWithAssignments = new HashSet<>();
+        zkMigrationClient.topicClient().iterateTopics(
+            EnumSet.of(TopicMigrationClient.TopicVisitorInterest.TOPICS),
+            (topicName, topicId, assignments) -> assignments.values().forEach(zkBrokersWithAssignments::addAll)
+        );
+
         if (imageDoesNotContainAllBrokers(image, zkBrokersWithAssignments)) {
-            log.info("Still waiting for ZK brokers {} to register with KRaft.", zkBrokersWithAssignments);
+            log.info("Still waiting for ZK brokers {} found in metadata to register with KRaft.", zkBrokersWithAssignments);
             return false;
         }
 
@@ -227,13 +223,20 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     /**
      * Apply a function which transforms our internal migration state.
      *
-     * @param name  A descriptive name of the function that is being applied
-     * @param stateMutator  A function which performs some migration operations and possibly transforms our internal state
+     * @param name         A descriptive name of the function that is being applied
+     * @param migrationOp  A function which performs some migration operations and possibly transforms our internal state
      */
-    private void apply(String name, Function<ZkMigrationLeadershipState, ZkMigrationLeadershipState> stateMutator) {
+    private void applyMigrationOperation(String name, KRaftMigrationOperation migrationOp) {
         ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
-        ZkMigrationLeadershipState afterState = stateMutator.apply(beforeState);
-        log.trace("{} transitioned from {} to {}", name, beforeState, afterState);
+        ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState);
+        if (afterState.loggableChangeSinceState(beforeState)) {
+            log.info("{} transitioned migration state from {} to {}", name, beforeState, afterState);
+        } else if (afterState.equals(beforeState)) {
+            log.trace("{} kept migration state as {}", name, afterState);
+        } else {
+            log.trace("{} transitioned migration state from {} to {}", name, beforeState, afterState);
+
+        }
         this.migrationLeadershipState = afterState;
     }
 
@@ -426,7 +429,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             boolean isActive = leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId);
 
             if (!isActive) {
-                apply("KRaftLeaderEvent is not active", state ->
+                applyMigrationOperation("KRaftLeaderEvent is not active", state ->
                     state.withNewKRaftController(
                         leaderAndEpoch.leaderId().orElse(ZkMigrationLeadershipState.EMPTY.kraftControllerId()),
                         leaderAndEpoch.epoch())
@@ -434,7 +437,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                 transitionTo(MigrationDriverState.INACTIVE);
             } else {
                 // Apply the new KRaft state
-                apply("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch()));
+                applyMigrationOperation("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch()));
 
                 // Before becoming the controller fo ZkBrokers, we need to make sure the
                 // Controller Quorum can handle migration.
@@ -492,7 +495,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         @Override
         public void run() throws Exception {
             if (migrationState == MigrationDriverState.BECOME_CONTROLLER) {
-                apply("BecomeZkLeaderEvent", zkMigrationClient::claimControllerLeadership);
+                applyMigrationOperation("BecomeZkLeaderEvent", zkMigrationClient::claimControllerLeadership);
                 if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
                     log.debug("Unable to claim leadership, will retry until we learn of a different KRaft leader");
                 } else {
@@ -563,7 +566,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                 ZkMigrationLeadershipState newState = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
                     offsetAndEpochAfterMigration.offset(),
                     offsetAndEpochAfterMigration.epoch());
-                apply("Finished migrating ZK data", state -> zkMigrationClient.setMigrationRecoveryState(newState));
+                applyMigrationOperation("Finished migrating ZK data", state -> zkMigrationClient.setMigrationRecoveryState(newState));
                 transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
             } catch (Throwable t) {
                 zkRecordConsumer.abortMigration();
@@ -630,149 +633,28 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                 propagator.setMetadataVersion(image.features().metadataVersion());
             }
 
-            if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
-                if (delta.topicsDelta() != null) {
-                    delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
-                        if (delta.topicsDelta().createdTopicIds().contains(topicId)) {
-                            apply("Create topic " + topicDelta.name(), migrationState ->
-                                zkMigrationClient.createTopic(
-                                    topicDelta.name(),
-                                    topicId,
-                                    topicDelta.partitionChanges(),
-                                    migrationState));
-                        } else {
-                            apply("Updating topic " + topicDelta.name(), migrationState ->
-                                zkMigrationClient.updateTopicPartitions(
-                                    Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
-                                    migrationState));
-                        }
-                    });
-                }
-
-                // For configs and client quotas, we need to send all of the data to the ZK
-                // client since we persist everything for a given entity in a single ZK node.
-                if (delta.configsDelta() != null) {
-                    delta.configsDelta().changes().forEach((configResource, configDelta) ->
-                        apply("Updating config resource " + configResource, migrationState ->
-                            zkMigrationClient.writeConfigs(configResource, image.configs().configMapForResource(configResource), migrationState)));
-                }
-
-                if ((delta.clientQuotasDelta() != null) || (delta.scramDelta() != null)) {
-                    // A list of users with scram or quota changes
-                    HashSet<String> users = new HashSet<String>();
-
-                    // Populate list with users with scram changes
-                    if (delta.scramDelta() != null) {
-                        delta.scramDelta().changes().forEach((scramMechanism, changes) -> {
-                            changes.forEach((userName, changeOpt) -> users.add(userName));
-                        });
-                    }
-
-                    // Populate list with users with quota changes 
-                    // and apply quota changes to all non user quota changes
-                    if (delta.clientQuotasDelta() != null) {
-                        Map<String, String> scramMap = new HashMap<String, String>();
-                        delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
-
-                            if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
-                                (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
-                                String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
-                                // Add clientQuotaEntity to list to process at the end
-                                users.add(userName);
-                            } else {
-                                Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                                apply("Updating client quota " + clientQuotaEntity, migrationState -> 
-                                    zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, scramMap, migrationState));
-                            }
-                        });
-                    }
-                    // Update user scram and quota data for each user with changes in either.
-                    users.forEach(userName -> {
-                        Map<String, String> userScramMap = getScramCredentialStringsForUser(userName);
-                        ClientQuotaEntity clientQuotaEntity = new
-                            ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
-                        if (image.clientQuotas() == null) {
-                            Map<String, Double> quotaMap = new HashMap<String, Double>();
-                            apply("Updating client quota " + clientQuotaEntity, migrationState ->
-                                zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
-                        } else {
-                            Map<String, Double> quotaMap = image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
-                            apply("Updating client quota " + clientQuotaEntity, migrationState ->
-                                zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
-                        }
-                    });
-                }
-
-                if (delta.producerIdsDelta() != null) {
-                    apply("Updating next producer ID", migrationState ->
-                        zkMigrationClient.writeProducerId(delta.producerIdsDelta().nextProducerId(), migrationState));
-                }
-
-                if (delta.aclsDelta() != null) {
-                    Map<ResourcePattern, List<AccessControlEntry>> deletedAcls = new HashMap<>();
-                    Map<ResourcePattern, List<AccessControlEntry>> addedAcls = new HashMap<>();
-                    delta.aclsDelta().changes().forEach((uuid, standardAclOpt) -> {
-                        if (!standardAclOpt.isPresent()) {
-                            StandardAcl acl = prevImage.acls().acls().get(uuid);
-                            if (acl != null) {
-                                addStandardAclToMap(deletedAcls, acl);
-                            } else {
-                                throw new RuntimeException("Cannot remove deleted ACL " + uuid + " from ZK since it is " +
-                                    "not present in the previous AclsImage");
-                            }
-                        } else {
-                            StandardAcl acl = standardAclOpt.get();
-                            addStandardAclToMap(addedAcls, acl);
-                        }
-                    });
-                    deletedAcls.forEach((resourcePattern, accessControlEntries) -> {
-                        String name = "Deleting " + accessControlEntries.size() + " for resource " + resourcePattern;
-                        apply(name, migrationState ->
-                            zkMigrationClient.removeDeletedAcls(resourcePattern, accessControlEntries, migrationState));
-                    });
-
-                    addedAcls.forEach((resourcePattern, accessControlEntries) -> {
-                        String name = "Adding " + accessControlEntries.size() + " for resource " + resourcePattern;
-                        apply(name, migrationState ->
-                            zkMigrationClient.writeAddedAcls(resourcePattern, accessControlEntries, migrationState));
-                    });
-                }
+            if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) {
+                log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
+                completionHandler.accept(null);
+            }
 
-                // TODO: Unhappy path: Probably relinquish leadership and let new controller
-                //  retry the write?
-                if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
-                    log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
-                    propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
-                        migrationLeadershipState.zkControllerEpoch());
-                } else {
-                    log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);
-                }
+            if (isSnapshot) {
+                zkMetadataWriter.handleSnapshot(image);
             } else {
-                log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance);
+                zkMetadataWriter.handleDelta(prevImage, image, delta);
             }
-            completionHandler.accept(null);
-        }
 
-        private Map<String, String> getScramCredentialStringsForUser(String userName) {
-            Map<String, String> userScramCredentialStrings = new HashMap<String, String>();
-            if (image.scram() != null) {
-                image.scram().mechanisms().forEach((scramMechanism, scramMechanismMap) -> {
-                    ScramCredentialData scramCredentialData = scramMechanismMap.get(userName);
-                    if (scramCredentialData != null) {
-                        userScramCredentialStrings.put(scramMechanism.mechanismName(),
-                            ScramCredentialUtils.credentialToString(
-                                scramCredentialData.toCredential(scramMechanism)));
-                    }
-                });
+            // TODO: Unhappy path: Probably relinquish leadership and let new controller
+            //  retry the write?
+            if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
+                log.trace("Sending RPCs to brokers for metadata {}.", metadataType);
+                propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
+                        migrationLeadershipState.zkControllerEpoch());
+            } else {
+                log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType);
             }
-            return userScramCredentialStrings;
-        }
 
-        private void addStandardAclToMap(Map<ResourcePattern, List<AccessControlEntry>> aclMap, StandardAcl acl) {
-            ResourcePattern resource = new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType());
-            aclMap.computeIfAbsent(resource, __ -> new ArrayList<>()).add(
-                new AccessControlEntry(acl.principal(), acl.host(), acl.operation(), acl.permissionType())
-            );
+            completionHandler.accept(null);
         }
 
         @Override
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationOperation.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationOperation.java
new file mode 100644
index 00000000000..55764c4cd1c
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationOperation.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+@FunctionalInterface
+public interface KRaftMigrationOperation {
+    ZkMigrationLeadershipState apply(ZkMigrationLeadershipState migrationState);
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
new file mode 100644
index 00000000000..a324165c897
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
+import org.apache.kafka.image.AclsDelta;
+import org.apache.kafka.image.AclsImage;
+import org.apache.kafka.image.ClientQuotaImage;
+import org.apache.kafka.image.ClientQuotasImage;
+import org.apache.kafka.image.ConfigurationsDelta;
+import org.apache.kafka.image.ConfigurationsImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.ScramImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.ScramCredentialData;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KRaftMigrationZkWriter {
+    private final MigrationClient migrationClient;
+    private final BiConsumer<String, KRaftMigrationOperation> operationConsumer;
+
+    public KRaftMigrationZkWriter(
+        MigrationClient migrationClient,
+        BiConsumer<String, KRaftMigrationOperation>  operationConsumer
+    ) {
+        this.migrationClient = migrationClient;
+        this.operationConsumer = operationConsumer;
+    }
+
+    public void handleSnapshot(MetadataImage image) {
+        handleTopicsSnapshot(image.topics());
+        handleConfigsSnapshot(image.configs());
+        handleClientQuotasSnapshot(image.clientQuotas(), image.scram());
+        operationConsumer.accept("Setting next producer ID", migrationState ->
+            migrationClient.writeProducerId(image.producerIds().highestSeenProducerId(), migrationState));
+        handleAclsSnapshot(image.acls());
+    }
+
+    public void handleDelta(MetadataImage previousImage, MetadataImage image, MetadataDelta delta) {
+        if (delta.topicsDelta() != null) {
+            handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, delta.topicsDelta());
+        }
+        if (delta.configsDelta() != null) {
+            handleConfigsDelta(image.configs(), delta.configsDelta());
+        }
+        if (delta.clientQuotasDelta() != null) {
+            handleClientQuotasDelta(image, delta);
+        }
+        if (delta.producerIdsDelta() != null) {
+            operationConsumer.accept("Updating next producer ID", migrationState ->
+                migrationClient.writeProducerId(delta.producerIdsDelta().nextProducerId(), migrationState));
+        }
+        if (delta.aclsDelta() != null) {
+            handleAclsDelta(image.acls(), delta.aclsDelta());
+        }
+    }
+
+    /**
+     * Handle a snapshot of the topic metadata. This requires scanning through all the topics and partitions
+     * in ZooKeeper to determine what has changed.
+     */
+    void handleTopicsSnapshot(TopicsImage topicsImage) {
+        Map<Uuid, String> deletedTopics = new HashMap<>();
+        Set<Uuid> createdTopics = new HashSet<>(topicsImage.topicsById().keySet());
+        Map<Uuid, Map<Integer, PartitionRegistration>> changedPartitions = new HashMap<>();
+
+        migrationClient.topicClient().iterateTopics(
+            EnumSet.of(
+                TopicMigrationClient.TopicVisitorInterest.TOPICS,
+                TopicMigrationClient.TopicVisitorInterest.PARTITIONS),
+            new TopicMigrationClient.TopicVisitor() {
+                @Override
+                public void visitTopic(String topicName, Uuid topicId, Map<Integer, List<Integer>> assignments) {
+                    TopicImage topic = topicsImage.getTopic(topicId);
+                    if (topic == null) {
+                        // If KRaft does not have this topic, it was deleted
+                        deletedTopics.put(topicId, topicName);
+                    } else {
+                        createdTopics.remove(topicId);
+                    }
+                }
+
+                @Override
+                public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistration partitionRegistration) {
+                    TopicImage topic = topicsImage.getTopic(topicIdPartition.topicId());
+                    if (topic == null) {
+                        return; // topic deleted in KRaft
+                    }
+
+                    // Check if the KRaft partition state changed
+                    PartitionRegistration kraftPartition = topic.partitions().get(topicIdPartition.partition());
+                    if (!kraftPartition.equals(partitionRegistration)) {
+                        changedPartitions.computeIfAbsent(topicIdPartition.topicId(), __ -> new HashMap<>())
+                            .put(topicIdPartition.partition(), kraftPartition);
+                    }
+                }
+            });
+
+        createdTopics.forEach(topicId -> {
+            TopicImage topic = topicsImage.getTopic(topicId);
+            operationConsumer.accept(
+                "Create Topic " + topic.name() + ", ID " + topicId,
+                migrationState -> migrationClient.topicClient().createTopic(topic.name(), topicId, topic.partitions(), migrationState)
+            );
+        });
+
+        deletedTopics.forEach((topicId, topicName) -> {
+            operationConsumer.accept(
+                "Delete Topic " + topicName + ", ID " + topicId,
+                migrationState -> migrationClient.topicClient().deleteTopic(topicName, migrationState)
+            );
+            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
+            operationConsumer.accept(
+                "Updating Configs for Topic " + topicName + ", ID " + topicId,
+                migrationState -> migrationClient.configClient().deleteConfigs(resource, migrationState)
+            );
+        });
+
+        changedPartitions.forEach((topicId, paritionMap) -> {
+            TopicImage topic = topicsImage.getTopic(topicId);
+            operationConsumer.accept(
+                "Updating Partitions for Topic " + topic.name() + ", ID " + topicId,
+                migrationState -> migrationClient.topicClient().updateTopicPartitions(
+                    Collections.singletonMap(topic.name(), paritionMap),
+                    migrationState));
+        });
+    }
+
+    void handleTopicsDelta(Function<Uuid, String> deletedTopicNameResolver, TopicsDelta topicsDelta) {
+        topicsDelta.deletedTopicIds().forEach(topicId -> {
+            String name = deletedTopicNameResolver.apply(topicId);
+            operationConsumer.accept("Deleting topic " + name + ", ID " + topicId,
+                migrationState -> migrationClient.topicClient().deleteTopic(name, migrationState));
+        });
+
+        topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
+            if (topicsDelta.createdTopicIds().contains(topicId)) {
+                operationConsumer.accept(
+                    "Create Topic " + topicDelta.name() + ", ID " + topicId,
+                    migrationState -> migrationClient.topicClient().createTopic(
+                        topicDelta.name(),
+                        topicId,
+                        topicDelta.partitionChanges(),
+                        migrationState));
+            } else {
+                operationConsumer.accept(
+                    "Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId,
+                    migrationState -> migrationClient.topicClient().updateTopicPartitions(
+                        Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()),
+                        migrationState));
+            }
+        });
+    }
+
+    void handleConfigsSnapshot(ConfigurationsImage configsImage) {
+        Set<ConfigResource> brokersToUpdate = new HashSet<>();
+        migrationClient.configClient().iterateBrokerConfigs((broker, configs) -> {
+            ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, broker);
+            Map<String, String> kraftProps = configsImage.configMapForResource(brokerResource);
+            if (!kraftProps.equals(configs)) {
+                brokersToUpdate.add(brokerResource);
+            }
+        });
+
+        brokersToUpdate.forEach(brokerResource -> {
+            Map<String, String> props = configsImage.configMapForResource(brokerResource);
+            if (props.isEmpty()) {
+                operationConsumer.accept("Delete configs for broker " + brokerResource.name(), migrationState ->
+                    migrationClient.configClient().deleteConfigs(brokerResource, migrationState));
+            } else {
+                operationConsumer.accept("Update configs for broker " + brokerResource.name(), migrationState ->
+                    migrationClient.configClient().writeConfigs(brokerResource, props, migrationState));
+            }
+        });
+    }
+
+    private Map<String, String> getScramCredentialStringsForUser(ScramImage image, String userName) {
+        Map<String, String> userScramCredentialStrings = new HashMap<>();
+        if (image != null) {
+            image.mechanisms().forEach((scramMechanism, scramMechanismMap) -> {
+                ScramCredentialData scramCredentialData = scramMechanismMap.get(userName);
+                if (scramCredentialData != null) {
+                    userScramCredentialStrings.put(scramMechanism.mechanismName(),
+                        ScramCredentialUtils.credentialToString(scramCredentialData.toCredential(scramMechanism)));
+                }
+            });
+        }
+        return userScramCredentialStrings;
+    }
+
+    void handleClientQuotasSnapshot(ClientQuotasImage clientQuotasImage, ScramImage scramImage) {
+        Set<ClientQuotaEntity> changedNonUserEntities = new HashSet<>();
+        Set<String> changedUsers = new HashSet<>();
+        migrationClient.configClient().iterateClientQuotas(new ConfigMigrationClient.ClientQuotaVisitor() {
+            @Override
+            public void visitClientQuota(List<ClientQuotaRecord.EntityData> entityDataList, Map<String, Double> quotas) {
+                Map<String, String> entityMap = new HashMap<>(2);
+                entityDataList.forEach(entityData -> entityMap.put(entityData.entityType(), entityData.entityName()));
+                ClientQuotaEntity entity = new ClientQuotaEntity(entityMap);
+                if (!clientQuotasImage.entities().getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap().equals(quotas)) {
+                    if (entity.entries().containsKey(ClientQuotaEntity.USER) &&
+                        !entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)) {
+                        // Track regular user entities separately
+                        changedUsers.add(entityMap.get(ClientQuotaEntity.USER));
+                    } else {
+                        changedNonUserEntities.add(entity);
+                    }
+                }
+            }
+
+            @Override
+            public void visitScramCredential(String userName, ScramMechanism scramMechanism, ScramCredential scramCredential) {
+                // For each ZK entity, see if it exists in the image and if it's equal
+                ScramCredentialData data = scramImage.mechanisms().getOrDefault(scramMechanism, Collections.emptyMap()).get(userName);
+                if (data == null || !data.toCredential(scramMechanism).equals(scramCredential)) {
+                    changedUsers.add(userName);
+                }
+            }
+        });
+
+        changedNonUserEntities.forEach(entity -> {
+            Map<String, Double> quotaMap = clientQuotasImage.entities().get(entity).quotaMap();
+            operationConsumer.accept("Update client quotas for " + entity, migrationState ->
+                migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, Collections.emptyMap(), migrationState));
+        });
+
+        changedUsers.forEach(userName -> {
+            ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
+            Map<String, Double> quotaMap = clientQuotasImage.entities().get(entity).quotaMap();
+            Map<String, String> scramMap = getScramCredentialStringsForUser(scramImage, userName);
+            operationConsumer.accept("Update scram credentials for " + userName, migrationState ->
+                migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, scramMap, migrationState));
+        });
+
+
+    }
+
+    void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta) {
+        Set<ConfigResource> updatedResources = configsDelta.changes().keySet();
+        updatedResources.forEach(configResource -> {
+            Map<String, String> props = configsImage.configMapForResource(configResource);
+            if (props.isEmpty()) {
+                operationConsumer.accept("Delete configs for " + configResource, migrationState ->
+                    migrationClient.configClient().deleteConfigs(configResource, migrationState));
+            } else {
+                operationConsumer.accept("Update configs for " + configResource, migrationState ->
+                    migrationClient.configClient().writeConfigs(configResource, props, migrationState));
+            }
+        });
+    }
+
+    void handleClientQuotasDelta(MetadataImage metadataImage, MetadataDelta metadataDelta) {
+        if ((metadataDelta.clientQuotasDelta() != null) || (metadataDelta.scramDelta() != null)) {
+            // A list of users with scram or quota changes
+            HashSet<String> users = new HashSet<>();
+
+            // Populate list with users with scram changes
+            if (metadataDelta.scramDelta() != null) {
+                metadataDelta.scramDelta().changes().forEach((scramMechanism, changes) -> {
+                    changes.forEach((userName, changeOpt) -> users.add(userName));
+                });
+            }
+
+            // Populate list with users with quota changes
+            // and apply quota changes to all non-user quota changes
+            if (metadataDelta.clientQuotasDelta() != null) {
+                metadataDelta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
+                    if ((clientQuotaEntity.entries().containsKey(ClientQuotaEntity.USER)) &&
+                            (!clientQuotaEntity.entries().containsKey(ClientQuotaEntity.CLIENT_ID))) {
+                        String userName = clientQuotaEntity.entries().get(ClientQuotaEntity.USER);
+                        // Add clientQuotaEntity to list to process at the end
+                        users.add(userName);
+                    } else {
+                        Map<String, Double> quotaMap = metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                        operationConsumer.accept("Updating client quota " + clientQuotaEntity, migrationState ->
+                            migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), quotaMap, Collections.emptyMap(), migrationState));
+                    }
+                });
+            }
+
+            // Update user scram and quota data for each user with changes in either.
+            users.forEach(userName -> {
+                Map<String, String> userScramMap = getScramCredentialStringsForUser(metadataImage.scram(), userName);
+                ClientQuotaEntity clientQuotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
+                if (metadataImage.clientQuotas() == null) {
+                    operationConsumer.accept("Updating client quota " + clientQuotaEntity, migrationState ->
+                        migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), Collections.emptyMap(), userScramMap, migrationState));
+                } else {
+                    Map<String, Double> quotaMap = metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                    operationConsumer.accept("Updating client quota " + clientQuotaEntity, migrationState ->
+                        migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
+                }
+            });
+        }
+    }
+
+    private ResourcePattern resourcePatternFromAcl(StandardAcl acl) {
+        return new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType());
+    }
+
+    void handleAclsSnapshot(AclsImage image) {
+        // Need to compare contents of image with all ACLs in ZK and issue updates
+        Map<ResourcePattern, Set<AccessControlEntry>> allAclsInSnapshot = new HashMap<>();
+
+        image.acls().values().forEach(standardAcl -> {
+            ResourcePattern resourcePattern = resourcePatternFromAcl(standardAcl);
+            allAclsInSnapshot.computeIfAbsent(resourcePattern, __ -> new HashSet<>()).add(
+                new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType())
+            );
+        });
+
+        Set<ResourcePattern> resourcesToDelete = new HashSet<>();
+        Map<ResourcePattern, Set<AccessControlEntry>> changedResources = new HashMap<>();
+        migrationClient.aclClient().iterateAcls((resourcePattern, accessControlEntries) -> {
+            if (!allAclsInSnapshot.containsKey(resourcePattern)) {
+                resourcesToDelete.add(resourcePattern);
+            } else {
+                Set<AccessControlEntry> snapshotEntries = allAclsInSnapshot.get(resourcePattern);
+                if (!snapshotEntries.equals(accessControlEntries)) {
+                    changedResources.put(resourcePattern, snapshotEntries);
+                }
+            }
+        });
+
+        resourcesToDelete.forEach(deletedResource -> {
+            String name = "Deleting resource " + deletedResource + " which has no ACLs in snapshot";
+            operationConsumer.accept(name, migrationState ->
+                migrationClient.aclClient().deleteResource(deletedResource, migrationState));
+        });
+
+        changedResources.forEach((resourcePattern, accessControlEntries) -> {
+            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
+            operationConsumer.accept(name, migrationState ->
+                migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState));
+        });
+    }
+
+    void handleAclsDelta(AclsImage image, AclsDelta delta) {
+        // Compute the resource patterns that were changed
+        Set<ResourcePattern> resourcesWithChangedAcls = delta.changes().values()
+            .stream()
+            .filter(Optional::isPresent)
+            .map(Optional::get)
+            .map(this::resourcePatternFromAcl)
+            .collect(Collectors.toSet());
+
+        Set<ResourcePattern> resourcesWithDeletedAcls = delta.deleted()
+            .stream()
+            .map(this::resourcePatternFromAcl)
+            .collect(Collectors.toSet());
+
+        // Need to collect all ACLs for any changed resource pattern
+        Map<ResourcePattern, List<AccessControlEntry>> aclsToWrite = new HashMap<>();
+        image.acls().forEach((uuid, standardAcl) -> {
+            ResourcePattern resourcePattern = resourcePatternFromAcl(standardAcl);
+            boolean removed = resourcesWithDeletedAcls.remove(resourcePattern);
+            // If a resource pattern is present in the delta as a changed or deleted acl, need to include it
+            if (resourcesWithChangedAcls.contains(resourcePattern) || removed) {
+                aclsToWrite.computeIfAbsent(resourcePattern, __ -> new ArrayList<>()).add(
+                    new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType())
+                );
+            }
+        });
+
+        resourcesWithDeletedAcls.forEach(deletedResource -> {
+            String name = "Deleting resource " + deletedResource + " which has no more ACLs";
+            operationConsumer.accept(name, migrationState ->
+                migrationClient.aclClient().deleteResource(deletedResource, migrationState));
+        });
+
+        aclsToWrite.forEach((resourcePattern, accessControlEntries) -> {
+            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
+            operationConsumer.accept(name, migrationState ->
+                migrationClient.aclClient().writeResourceAcls(resourcePattern, accessControlEntries, migrationState));
+        });
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
index 8ed1cd2e7ae..286e261b2c6 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
@@ -16,17 +16,10 @@
  */
 package org.apache.kafka.metadata.migration;
 
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.acl.AccessControlEntry;
-import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.resource.ResourcePattern;
-import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 /**
@@ -75,53 +68,18 @@ public interface MigrationClient {
      */
     ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state);
 
-    ZkMigrationLeadershipState createTopic(
-        String topicName,
-        Uuid topicId,
-        Map<Integer, PartitionRegistration> topicPartitions,
-        ZkMigrationLeadershipState state
-    );
+    TopicMigrationClient topicClient();
 
-    ZkMigrationLeadershipState updateTopicPartitions(
-        Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
-        ZkMigrationLeadershipState state
-    );
+    ConfigMigrationClient configClient();
 
-    ZkMigrationLeadershipState writeConfigs(
-        ConfigResource configResource,
-        Map<String, String> configMap,
-        ZkMigrationLeadershipState state
-    );
-
-    ZkMigrationLeadershipState writeClientQuotas(
-        Map<String, String> clientQuotaEntity,
-        Map<String, Double> quotas,
-        Map<String, String> scram,
-        ZkMigrationLeadershipState state
-    );
+    AclMigrationClient aclClient();
 
     ZkMigrationLeadershipState writeProducerId(
         long nextProducerId,
         ZkMigrationLeadershipState state
     );
 
-    ZkMigrationLeadershipState removeDeletedAcls(
-        ResourcePattern resourcePattern,
-        List<AccessControlEntry> deletedAcls,
-        ZkMigrationLeadershipState state
-    );
-
-    ZkMigrationLeadershipState writeAddedAcls(
-        ResourcePattern resourcePattern,
-        List<AccessControlEntry> addedAcls,
-        ZkMigrationLeadershipState state
-    );
-
-    void iterateAcls(BiConsumer<ResourcePattern, Set<AccessControlEntry>> aclConsumer);
-
     void readAllMetadata(Consumer<List<ApiMessageAndVersion>> batchConsumer, Consumer<Integer> brokerIdConsumer);
 
     Set<Integer> readBrokerIds();
-
-    Set<Integer> readBrokerIdsFromTopicAssignments();
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
new file mode 100644
index 00000000000..e373d066f2c
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public interface TopicMigrationClient {
+
+    enum TopicVisitorInterest {
+        TOPICS,
+        PARTITIONS,
+        CONFIGS
+    }
+
+    interface TopicVisitor {
+        void visitTopic(String topicName, Uuid topicId, Map<Integer, List<Integer>> assignments);
+        default void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistration partitionRegistration) {
+
+        }
+        default void visitConfigs(String topicName, Properties topicProps) {
+
+        }
+    }
+
+    void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor);
+
+    ZkMigrationLeadershipState deleteTopic(
+        String topicName,
+        ZkMigrationLeadershipState state
+    );
+
+    ZkMigrationLeadershipState createTopic(
+        String topicName,
+        Uuid topicId,
+        Map<Integer, PartitionRegistration> topicPartitions,
+        ZkMigrationLeadershipState state
+    );
+
+    ZkMigrationLeadershipState updateTopicPartitions(
+        Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
+        ZkMigrationLeadershipState state
+    );
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
index 1d27a4e41cd..4a02235ce25 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
@@ -138,6 +138,21 @@ public class ZkMigrationLeadershipState {
         return new OffsetAndEpoch(kraftMetadataOffset, kraftMetadataEpoch);
     }
 
+    public boolean loggableChangeSinceState(ZkMigrationLeadershipState other) {
+        if (other == null) {
+            return false;
+        }
+        if (this.equals(other)) {
+            return false;
+        } else {
+            // Did the controller change, or did we finish the migration?
+            return
+                this.kraftControllerId != other.kraftControllerId ||
+                this.kraftControllerEpoch != other.kraftControllerEpoch ||
+                (!other.zkMigrationComplete() && this.zkMigrationComplete());
+        }
+    }
+
     @Override
     public String toString() {
         return "ZkMigrationLeadershipState{" +
@@ -176,7 +191,7 @@ public class ZkMigrationLeadershipState {
             kraftMetadataEpoch,
             lastUpdatedTimeMs,
             migrationZkVersion,
-                zkControllerEpoch,
-                zkControllerEpochZkVersion);
+            zkControllerEpoch,
+            zkControllerEpochZkVersion);
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 11af3488bf5..1de99df61f2 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -56,9 +56,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
 public class TopicsImageTest {
-    static final TopicsImage IMAGE1;
+    public static final TopicsImage IMAGE1;
 
-    static final List<ApiMessageAndVersion> DELTA1_RECORDS;
+    public static final List<ApiMessageAndVersion> DELTA1_RECORDS;
 
     static final TopicsDelta DELTA1;
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingAclMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingAclMigrationClient.java
new file mode 100644
index 00000000000..717750379a5
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingAclMigrationClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.resource.ResourcePattern;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+public class CapturingAclMigrationClient implements AclMigrationClient {
+
+    public List<ResourcePattern> deletedResources = new ArrayList<>();
+    public LinkedHashMap<ResourcePattern, Collection<AccessControlEntry>> updatedResources = new LinkedHashMap<>();
+
+    public void reset() {
+        deletedResources.clear();
+        updatedResources.clear();
+    }
+
+    @Override
+    public ZkMigrationLeadershipState deleteResource(ResourcePattern resourcePattern, ZkMigrationLeadershipState state) {
+        deletedResources.add(resourcePattern);
+        return state;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState writeResourceAcls(ResourcePattern resourcePattern, Collection<AccessControlEntry> aclsToWrite, ZkMigrationLeadershipState state) {
+        updatedResources.put(resourcePattern, aclsToWrite);
+        return state;
+    }
+
+    @Override
+    public void iterateAcls(BiConsumer<ResourcePattern, Set<AccessControlEntry>> aclConsumer) {
+
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java
new file mode 100644
index 00000000000..19ed12a381b
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingConfigMigrationClient.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.config.ConfigResource;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+public class CapturingConfigMigrationClient implements ConfigMigrationClient {
+    public List<ConfigResource> deletedResources = new ArrayList<>();
+    public LinkedHashMap<ConfigResource, Map<String, String>> writtenConfigs = new LinkedHashMap<>();
+
+    public void reset() {
+        deletedResources.clear();
+        writtenConfigs.clear();
+    }
+
+    @Override
+    public void iterateClientQuotas(ClientQuotaVisitor visitor) {
+
+    }
+
+    @Override
+    public void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer) {
+
+    }
+
+    @Override
+    public ZkMigrationLeadershipState writeConfigs(ConfigResource configResource, Map<String, String> configMap, ZkMigrationLeadershipState state) {
+        writtenConfigs.put(configResource, configMap);
+        return state;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState writeClientQuotas(Map<String, String> clientQuotaEntity, Map<String, Double> quotas, Map<String, String> scram, ZkMigrationLeadershipState state) {
+        return null;
+    }
+
+
+    @Override
+    public ZkMigrationLeadershipState deleteConfigs(ConfigResource configResource, ZkMigrationLeadershipState state) {
+        deletedResources.add(configResource);
+        return state;
+    }
+
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
new file mode 100644
index 00000000000..8d4b70dc549
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+class CapturingMigrationClient implements MigrationClient {
+
+    static Builder newBuilder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        Set<Integer> brokersInZk = Collections.emptySet();
+        TopicMigrationClient topicMigrationClient = new CapturingTopicMigrationClient();
+        ConfigMigrationClient configMigrationClient = new CapturingConfigMigrationClient();
+        AclMigrationClient aclMigrationClient = new CapturingAclMigrationClient();
+
+        public Builder setBrokersInZk(int... brokerIds) {
+            brokersInZk = IntStream.of(brokerIds).boxed().collect(Collectors.toSet());
+            return this;
+        }
+
+        public Builder setTopicMigrationClient(TopicMigrationClient topicMigrationClient) {
+            this.topicMigrationClient = topicMigrationClient;
+            return this;
+        }
+
+        public Builder setConfigMigrationClient(ConfigMigrationClient configMigrationClient) {
+            this.configMigrationClient = configMigrationClient;
+            return this;
+        }
+
+        public CapturingMigrationClient build() {
+            return new CapturingMigrationClient(
+                brokersInZk,
+                topicMigrationClient,
+                configMigrationClient,
+                aclMigrationClient
+            );
+        }
+    }
+
+    private final Set<Integer> brokerIds;
+    private final TopicMigrationClient topicMigrationClient;
+    private final ConfigMigrationClient configMigrationClient;
+    private final AclMigrationClient aclMigrationClient;
+
+    private ZkMigrationLeadershipState state = null;
+
+    CapturingMigrationClient(
+        Set<Integer> brokerIdsInZk,
+        TopicMigrationClient topicMigrationClient,
+        ConfigMigrationClient configMigrationClient,
+        AclMigrationClient aclMigrationClient
+    ) {
+        this.brokerIds = brokerIdsInZk;
+        this.topicMigrationClient = topicMigrationClient;
+        this.configMigrationClient = configMigrationClient;
+        this.aclMigrationClient = aclMigrationClient;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
+        if (this.state == null) {
+            this.state = initialState;
+        }
+        return this.state;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState setMigrationRecoveryState(ZkMigrationLeadershipState state) {
+        this.state = state;
+        return state;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
+        this.state = state;
+        return state;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state) {
+        this.state = state;
+        return state;
+    }
+
+
+    @Override
+    public TopicMigrationClient topicClient() {
+        return topicMigrationClient;
+    }
+
+    @Override
+    public ConfigMigrationClient configClient() {
+        return configMigrationClient;
+    }
+
+    @Override
+    public AclMigrationClient aclClient() {
+        return aclMigrationClient;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState writeProducerId(
+        long nextProducerId,
+        ZkMigrationLeadershipState state
+    ) {
+        this.state = state;
+        return state;
+    }
+
+    @Override
+    public void readAllMetadata(
+        Consumer<List<ApiMessageAndVersion>> batchConsumer,
+        Consumer<Integer> brokerIdConsumer
+    ) {
+
+    }
+
+    @Override
+    public Set<Integer> readBrokerIds() {
+        return brokerIds;
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
new file mode 100644
index 00000000000..796d211f4d6
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CapturingTopicMigrationClient implements TopicMigrationClient {
+    public List<String> deletedTopics = new ArrayList<>();
+    public List<String> createdTopics = new ArrayList<>();
+    public LinkedHashMap<String, Set<Integer>> updatedTopicPartitions = new LinkedHashMap<>();
+
+    public void reset() {
+        createdTopics.clear();
+        updatedTopicPartitions.clear();
+        deletedTopics.clear();
+    }
+
+
+    @Override
+    public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+        
+    }
+
+    @Override
+    public ZkMigrationLeadershipState deleteTopic(String topicName, ZkMigrationLeadershipState state) {
+        deletedTopics.add(topicName);
+        return state;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState createTopic(String topicName, Uuid topicId, Map<Integer, PartitionRegistration> topicPartitions, ZkMigrationLeadershipState state) {
+        createdTopics.add(topicName);
+        return state;
+    }
+
+    @Override
+    public ZkMigrationLeadershipState updateTopicPartitions(Map<String, Map<Integer, PartitionRegistration>> topicPartitions, ZkMigrationLeadershipState state) {
+        topicPartitions.forEach((topicName, partitionMap) ->
+            updatedTopicPartitions.put(topicName, partitionMap.keySet())
+        );
+        return state;
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index d4e7db6dd91..0f84a068b81 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -19,23 +19,30 @@ package org.apache.kafka.metadata.migration;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
-import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.controller.QuorumFeatures;
+import org.apache.kafka.image.AclsImage;
+import org.apache.kafka.image.ClientQuotasImage;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.ConfigurationsImage;
+import org.apache.kafka.image.FeaturesImage;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.ProducerIdsImage;
+import org.apache.kafka.image.ScramImage;
 import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.image.loader.SnapshotManifest;
 import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
-import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -50,17 +57,23 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.OptionalInt;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.image.TopicsImageTest.DELTA1_RECORDS;
+import static org.apache.kafka.image.TopicsImageTest.IMAGE1;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -112,137 +125,6 @@ public class KRaftMigrationDriverTest {
         }
     }
 
-    static class CapturingMigrationClient implements MigrationClient {
-
-        private final Set<Integer> brokerIds;
-        public final Map<ConfigResource, Map<String, String>> capturedConfigs = new HashMap<>();
-        private ZkMigrationLeadershipState state = null;
-
-        public CapturingMigrationClient(Set<Integer> brokerIdsInZk) {
-            this.brokerIds = brokerIdsInZk;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
-            if (state == null) {
-                state = initialState;
-            }
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState setMigrationRecoveryState(ZkMigrationLeadershipState state) {
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state) {
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState createTopic(
-            String topicName,
-            Uuid topicId,
-            Map<Integer, PartitionRegistration> topicPartitions,
-            ZkMigrationLeadershipState state
-        ) {
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState updateTopicPartitions(
-            Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
-            ZkMigrationLeadershipState state
-        ) {
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState writeConfigs(
-            ConfigResource configResource,
-            Map<String, String> configMap,
-            ZkMigrationLeadershipState state
-        ) {
-            capturedConfigs.computeIfAbsent(configResource, __ -> new HashMap<>()).putAll(configMap);
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState writeClientQuotas(
-            Map<String, String> clientQuotaEntity,
-            Map<String, Double> quotas,
-            Map<String, String> scram,
-            ZkMigrationLeadershipState state
-        ) {
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState writeProducerId(
-            long nextProducerId,
-            ZkMigrationLeadershipState state
-        ) {
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState removeDeletedAcls(
-            ResourcePattern resourcePattern,
-            List<AccessControlEntry> deletedAcls,
-            ZkMigrationLeadershipState state
-        ) {
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public ZkMigrationLeadershipState writeAddedAcls(
-            ResourcePattern resourcePattern,
-            List<AccessControlEntry> addedAcls,
-            ZkMigrationLeadershipState state
-        ) {
-            this.state = state;
-            return state;
-        }
-
-        @Override
-        public void iterateAcls(BiConsumer<ResourcePattern, Set<AccessControlEntry>> aclConsumer) {
-
-        }
-
-        @Override
-        public void readAllMetadata(
-            Consumer<List<ApiMessageAndVersion>> batchConsumer,
-            Consumer<Integer> brokerIdConsumer
-        ) {
-
-        }
-
-        @Override
-        public Set<Integer> readBrokerIds() {
-            return brokerIds;
-        }
-
-        @Override
-        public Set<Integer> readBrokerIdsFromTopicAssignments() {
-            return brokerIds;
-        }
-    }
-
     static class CountingMetadataPropagator implements LegacyPropagator {
 
         public int deltas = 0;
@@ -326,7 +208,11 @@ public class KRaftMigrationDriverTest {
     @Test
     public void testOnlySendNeededRPCsToBrokers() throws Exception {
         CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
-        CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)));
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(1, 2, 3)
+            .setConfigMigrationClient(configClient)
+            .build();
         try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
             3000,
             new NoOpRecordConsumer(),
@@ -357,8 +243,8 @@ public class KRaftMigrationDriverTest {
             TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
                 "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
 
-            Assertions.assertEquals(1, metadataPropagator.images);
-            Assertions.assertEquals(0, metadataPropagator.deltas);
+            assertEquals(1, metadataPropagator.images);
+            assertEquals(0, metadataPropagator.deltas);
 
             delta = new MetadataDelta(image);
             delta.replay(new ConfigRecord()
@@ -370,9 +256,9 @@ public class KRaftMigrationDriverTest {
             image = delta.apply(provenance);
             enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
 
-            Assertions.assertEquals(1, migrationClient.capturedConfigs.size());
-            Assertions.assertEquals(1, metadataPropagator.images);
-            Assertions.assertEquals(0, metadataPropagator.deltas);
+            assertEquals(1, configClient.writtenConfigs.size());
+            assertEquals(1, metadataPropagator.images);
+            assertEquals(0, metadataPropagator.deltas);
 
             delta = new MetadataDelta(image);
             delta.replay(new BrokerRegistrationChangeRecord()
@@ -384,8 +270,8 @@ public class KRaftMigrationDriverTest {
             image = delta.apply(provenance);
             enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1, TimeUnit.MINUTES);
 
-            Assertions.assertEquals(1, metadataPropagator.images);
-            Assertions.assertEquals(1, metadataPropagator.deltas);
+            assertEquals(1, metadataPropagator.images);
+            assertEquals(1, metadataPropagator.deltas);
         }
     }
 
@@ -394,7 +280,7 @@ public class KRaftMigrationDriverTest {
     public void testMigrationWithClientException(boolean authException) throws Exception {
         CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
         CountDownLatch claimLeaderAttempts = new CountDownLatch(3);
-        CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3))) {
+        CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)), new CapturingTopicMigrationClient(), null, null) {
             @Override
             public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
                 if (claimLeaderAttempts.getCount() == 0) {
@@ -437,12 +323,12 @@ public class KRaftMigrationDriverTest {
             // Publish metadata of all the ZK brokers being ready
             driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
                 new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
-            Assertions.assertTrue(claimLeaderAttempts.await(1, TimeUnit.MINUTES));
+            assertTrue(claimLeaderAttempts.await(1, TimeUnit.MINUTES));
             TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
                 "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
 
             if (authException) {
-                Assertions.assertEquals(MigrationClientAuthException.class, faultHandler.firstException().getCause().getClass());
+                assertEquals(MigrationClientAuthException.class, faultHandler.firstException().getCause().getClass());
             } else {
                 Assertions.assertNull(faultHandler.firstException());
             }
@@ -452,7 +338,7 @@ public class KRaftMigrationDriverTest {
     @Test
     public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate() throws Exception {
         CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
-        CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1)));
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1).build();
         apiVersions.remove("6");
 
         try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
@@ -496,9 +382,10 @@ public class KRaftMigrationDriverTest {
         }
     }
 
+    @Test
     public void testSkipWaitForBrokersInDualWrite() throws Exception {
         CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
-        CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet());
+        CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(), null, null, null);
         MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
         try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
                 3000,
@@ -534,5 +421,157 @@ public class KRaftMigrationDriverTest {
                 "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
         }
     }
-}
 
+    @FunctionalInterface
+    interface TopicDualWriteVerifier {
+        void verify(
+            KRaftMigrationDriver driver,
+            CapturingTopicMigrationClient topicClient,
+            CapturingConfigMigrationClient configClient
+        ) throws Exception;
+    }
+
+    public void setupTopicDualWrite(TopicDualWriteVerifier verifier) throws Exception {
+        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
+
+        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor visitor) {
+                IMAGE1.topicsByName().forEach((topicName, topicImage) -> {
+                    Map<Integer, List<Integer>> assignment = new HashMap<>();
+                    topicImage.partitions().forEach((partitionId, partitionRegistration) ->
+                        assignment.put(partitionId, IntStream.of(partitionRegistration.replicas).boxed().collect(Collectors.toList()))
+                    );
+                    visitor.visitTopic(topicName, topicImage.id(), assignment);
+
+                    topicImage.partitions().forEach((partitionId, partitionRegistration) ->
+                        visitor.visitPartition(new TopicIdPartition(topicImage.id(), new TopicPartition(topicName, partitionId)), partitionRegistration)
+                    );
+                });
+            }
+        };
+        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0, 1, 2, 3, 4, 5)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .build();
+
+        try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
+            3000,
+            new NoOpRecordConsumer(),
+            migrationClient,
+            metadataPropagator,
+            metadataPublisher -> { },
+            new MockFaultHandler("test"),
+            quorumFeatures,
+            mockTime
+        )) {
+            verifier.verify(driver, topicClient, configClient);
+        }
+    }
+
+    @Test
+    public void testTopicDualWriteSnapshot() throws Exception {
+        setupTopicDualWrite((driver, topicClient, configClient) -> {
+            MetadataImage image = new MetadataImage(
+                MetadataProvenance.EMPTY,
+                FeaturesImage.EMPTY,
+                ClusterImage.EMPTY,
+                IMAGE1,
+                ConfigurationsImage.EMPTY,
+                ClientQuotasImage.EMPTY,
+                ProducerIdsImage.EMPTY,
+                AclsImage.EMPTY,
+                ScramImage.EMPTY);
+            MetadataDelta delta = new MetadataDelta(image);
+
+            driver.start();
+            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+            delta.replay(zkBrokerRecord(0));
+            delta.replay(zkBrokerRecord(1));
+            delta.replay(zkBrokerRecord(2));
+            delta.replay(zkBrokerRecord(3));
+            delta.replay(zkBrokerRecord(4));
+            delta.replay(zkBrokerRecord(5));
+            MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+            image = delta.apply(provenance);
+
+            // Publish a delta with this node (3000) as the leader
+            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
+            driver.onControllerChange(newLeader);
+            driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
+
+            // Wait for migration
+            TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+                "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
+
+            // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz
+            provenance = new MetadataProvenance(200, 1, 1);
+            delta = new MetadataDelta(image);
+            RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
+            image = delta.apply(provenance);
+            driver.onMetadataUpdate(delta, image, new SnapshotManifest(provenance, 100));
+            driver.migrationState().get(1, TimeUnit.MINUTES);
+
+            assertEquals(1, topicClient.deletedTopics.size());
+            assertEquals("foo", topicClient.deletedTopics.get(0));
+            assertEquals(1, topicClient.createdTopics.size());
+            assertEquals("baz", topicClient.createdTopics.get(0));
+            assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0));
+            assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0));
+        });
+    }
+
+    @Test
+    public void testTopicDualWriteDelta() throws Exception {
+        setupTopicDualWrite((driver, topicClient, configClient) -> {
+            MetadataImage image = new MetadataImage(
+                MetadataProvenance.EMPTY,
+                FeaturesImage.EMPTY,
+                ClusterImage.EMPTY,
+                IMAGE1,
+                ConfigurationsImage.EMPTY,
+                ClientQuotasImage.EMPTY,
+                ProducerIdsImage.EMPTY,
+                AclsImage.EMPTY,
+                ScramImage.EMPTY);
+            MetadataDelta delta = new MetadataDelta(image);
+
+            driver.start();
+            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+            delta.replay(zkBrokerRecord(0));
+            delta.replay(zkBrokerRecord(1));
+            delta.replay(zkBrokerRecord(2));
+            delta.replay(zkBrokerRecord(3));
+            delta.replay(zkBrokerRecord(4));
+            delta.replay(zkBrokerRecord(5));
+            MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+            image = delta.apply(provenance);
+
+            // Publish a delta with this node (3000) as the leader
+            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
+            driver.onControllerChange(newLeader);
+            driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
+
+            // Wait for migration
+            TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+                    "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
+
+            // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz
+            provenance = new MetadataProvenance(200, 1, 1);
+            delta = new MetadataDelta(image);
+            RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
+            image = delta.apply(provenance);
+            driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance, newLeader, 1, 100, 42));
+            driver.migrationState().get(1, TimeUnit.MINUTES);
+
+            assertEquals(1, topicClient.deletedTopics.size());
+            assertEquals("foo", topicClient.deletedTopics.get(0));
+            assertEquals(1, topicClient.createdTopics.size());
+            assertEquals("baz", topicClient.createdTopics.get(0));
+            assertTrue(topicClient.updatedTopicPartitions.get("bar").contains(0));
+            assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0));
+        });
+    }
+}
\ No newline at end of file
diff --git a/tests/kafkatest/tests/core/zookeeper_migration_test.py b/tests/kafkatest/tests/core/zookeeper_migration_test.py
index 24530095871..a1be092d302 100644
--- a/tests/kafkatest/tests/core/zookeeper_migration_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_migration_test.py
@@ -117,6 +117,7 @@ class TestMigration(ProduceConsumeValidateTest):
                                         message_validator=is_int, version=DEV_BRANCH)
 
         self.run_produce_consume_validate(core_test_action=self.do_migration)
+        self.kafka.stop()
 
     @parametrize(metadata_quorum=isolated_kraft)
     def test_pre_migration_mode_3_4(self, metadata_quorum):
@@ -254,3 +255,4 @@ class TestMigration(ProduceConsumeValidateTest):
                     continue
 
         assert saw_expected_log, "Did not see expected INFO log after upgrading from a 3.4 migration"
+        self.kafka.stop()