You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/16 21:05:21 UTC

[kafka] 02/11: KAFKA-14427 ZK client support for migrations (#12946)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4b03c8c4c3dd5a0c32a11efabf9318883b0483ee
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Dec 8 13:14:01 2022 -0500

    KAFKA-14427 ZK client support for migrations (#12946)
    
    This patch adds support for reading and writing ZooKeeper metadata during a KIP-866 migration.
    
    For reading metadata from ZK, methods from KafkaZkClient and ZkData are reused to ensure we are decoding the JSON consistently.
    
    For writing metadata, we use a new multi-op transaction that ensures only a single controller is writing to ZK. This is similar to the existing multi-op transaction that KafkaController uses, but it also includes a check on the new "/migration" ZNode. The transaction consists of three operations:
    
    * CheckOp on /controller_epoch
    * SetDataOp on /migration with zkVersion
    * CreateOp/SetDataOp/DeleteOp (the actual operation being applied)
    
    In the case of a batch of operations (such as topic creation), only the final MultiOp has a SetDataOp on /migration while the other requests use a CheckOp (similar to /controller_epoch).
    
    Reviewers: Colin Patrick McCabe <cm...@apache.org>, dengziming <de...@gmail.com>
---
 checkstyle/import-control.xml                      |   1 +
 .../main/scala/kafka/server/ZkAdminManager.scala   |  27 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 333 ++++++++++++++--
 core/src/main/scala/kafka/zk/ZkData.scala          |   9 +-
 .../main/scala/kafka/zk/ZkMigrationClient.scala    | 441 +++++++++++++++++++++
 .../kafka/zk/ZkMigrationIntegrationTest.scala      | 105 +++++
 .../unit/kafka/server/ZkAdminManagerTest.scala     |  28 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  51 ++-
 .../unit/kafka/zk/ZkMigrationClientTest.scala      | 353 +++++++++++++++++
 .../org/apache/kafka/image/ClientQuotasImage.java  |   3 +-
 .../kafka/metadata/migration/MigrationClient.java  |  91 +++++
 .../migration/ZkMigrationLeadershipState.java      |  49 ++-
 12 files changed, 1423 insertions(+), 68 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index bd05521964e..7a62b671f84 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -227,6 +227,7 @@
     <allow pkg="org.apache.kafka.controller" />
     <allow pkg="org.apache.kafka.metadata" />
     <allow pkg="org.apache.kafka.metadata.authorizer" />
+    <allow pkg="org.apache.kafka.metadata.migration" />
     <allow pkg="org.apache.kafka.metalog" />
     <allow pkg="org.apache.kafka.queue" />
     <allow pkg="org.apache.kafka.raft" />
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 634ce6b097c..1a22024d723 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -54,6 +54,19 @@ import org.apache.kafka.common.utils.Sanitizer
 import scala.collection.{Map, mutable, _}
 import scala.jdk.CollectionConverters._
 
+object ZkAdminManager {
+  def clientQuotaPropsToDoubleMap(props: Map[String, String]): Map[String, Double] = {
+    props.map { case (key, value) =>
+      val doubleValue = try value.toDouble catch {
+        case _: NumberFormatException =>
+          throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
+      }
+      key -> doubleValue
+    }
+  }
+}
+
+
 class ZkAdminManager(val config: KafkaConfig,
                      val metrics: Metrics,
                      val metadataCache: MetadataCache,
@@ -636,16 +649,6 @@ class ZkAdminManager(val config: KafkaConfig,
 
   private def sanitized(name: Option[String]): String = name.map(n => sanitizeEntityName(n)).getOrElse("")
 
-  private def fromProps(props: Map[String, String]): Map[String, Double] = {
-    props.map { case (key, value) =>
-      val doubleValue = try value.toDouble catch {
-        case _: NumberFormatException =>
-          throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
-      }
-      key -> doubleValue
-    }
-  }
-
   def handleDescribeClientQuotas(userComponent: Option[ClientQuotaFilterComponent],
     clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = {
 
@@ -706,7 +709,7 @@ class ZkAdminManager(val config: KafkaConfig,
     (userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) =>
       val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isClientOrUserQuotaConfig(key) }
       if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
-        Some(userClientIdToEntity(u, c) -> fromProps(quotaProps))
+        Some(userClientIdToEntity(u, c) -> ZkAdminManager.clientQuotaPropsToDoubleMap(quotaProps))
       else
         None
     }.toMap
@@ -732,7 +735,7 @@ class ZkAdminManager(val config: KafkaConfig,
     ipEntries.flatMap { case (ip, props) =>
       val ipQuotaProps = props.asScala.filter { case (key, _) => DynamicConfig.Ip.names.contains(key) }
       if (ipQuotaProps.nonEmpty)
-        Some(ipToQuotaEntity(ip) -> fromProps(ipQuotaProps))
+        Some(ipToQuotaEntity(ip) -> ZkAdminManager.clientQuotaPropsToDoubleMap(ipQuotaProps))
       else
         None
     }
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 747673d37db..12f4bfb2c3e 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -34,12 +34,13 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
-import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
+import org.apache.zookeeper.OpResult.{CheckResult, CreateResult, ErrorResult, SetDataResult}
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.common.ZKConfig
 import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
+import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper}
 
 import scala.collection.{Map, Seq, mutable}
 
@@ -156,6 +157,83 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     tryCreateControllerZNodeAndIncrementEpoch()
   }
 
+  /**
+   * Registers a given KRaft controller in zookeeper as the active controller. Unlike the ZK equivalent of this method,
+   * this creates /controller as a persistent znode. This prevents ZK brokers from attempting to claim the controller
+   * leadership during a KRaft leadership failover.
+   *
+   * This method is called at the beginning of a KRaft migration and during subsequent KRaft leadership changes during
+   * the migration.
+   *
+   * To ensure that the KRaft controller epoch exceeds the current ZK controller epoch, this registration algorithm
+   * uses a conditional update on the /controller_epoch znode. If a new ZK controller is elected during this method,
+   * the conditional update on /controller_epoch fails which causes the whole multi-op transaction to fail.
+   *
+   * @param kraftControllerId ID of the KRaft controller node
+   * @param kraftControllerEpoch Epoch of the KRaft controller node
+   * @return An optional of the new zkVersion of /controller_epoch. None if we could not register the KRaft controller.
+   */
+  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[Int] = {
+    val timestamp = time.milliseconds()
+    val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, e._2.getVersion))
+    val controllerOpt = getControllerId
+    val controllerEpochToStore = kraftControllerEpoch + 10000000 // TODO Remove this after KAFKA-14436
+    curEpochOpt match {
+      case None =>
+        throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
+          s"since there is no ZK controller epoch present.")
+      case Some((curEpoch: Int, curEpochZk: Int)) =>
+        if (curEpoch >= controllerEpochToStore) {
+          // TODO KAFKA-14436 Need to ensure KRaft has a higher epoch an ZK
+          throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
+            s"in ZK since its epoch ${controllerEpochToStore} is not higher than the current ZK epoch ${curEpoch}.")
+        }
+
+        val response = if (controllerOpt.isDefined) {
+          info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active " +
+            s"controller with epoch $controllerEpochToStore. The previous controller was ${controllerOpt.get}.")
+          retryRequestUntilConnected(
+            MultiRequest(Seq(
+              SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
+              DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion),
+              CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp),
+                defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
+          )
+        } else {
+          info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active " +
+            s"controller with epoch $controllerEpochToStore. There was no active controller.")
+          retryRequestUntilConnected(
+            MultiRequest(Seq(
+              SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
+              CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp),
+                defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
+          )
+        }
+
+        val failureSuffix = s"while trying to register KRaft controller $kraftControllerId with epoch " +
+          s"$controllerEpochToStore. KRaft controller was not registered."
+        response.resultCode match {
+          case Code.OK =>
+            info(s"Successfully registered KRaft controller $kraftControllerId with epoch $controllerEpochToStore")
+            // First op is always SetData on /controller_epoch
+            val setDataResult = response.zkOpResults(0).rawOpResult.asInstanceOf[SetDataResult]
+            Some(setDataResult.getStat.getVersion)
+          case Code.BADVERSION =>
+            info(s"The controller epoch changed $failureSuffix")
+            None
+          case Code.NONODE =>
+            info(s"The ephemeral node at ${ControllerZNode.path} went away $failureSuffix")
+            None
+          case Code.NODEEXISTS =>
+            info(s"The ephemeral node at ${ControllerZNode.path} was created by another controller $failureSuffix")
+            None
+          case code =>
+            error(s"ZooKeeper had an error $failureSuffix")
+            throw KeeperException.create(code)
+        }
+    }
+  }
+
   private def maybeCreateControllerEpochZNode(): (Int, Int) = {
     createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode match {
       case Code.OK =>
@@ -340,6 +418,24 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
   }
 
+  def getEntitiesConfigs(rootEntityType: String, sanitizedEntityNames: Set[String]): Map[String, Properties] = {
+    val getDataRequests: Seq[GetDataRequest] = sanitizedEntityNames.map { entityName =>
+      GetDataRequest(ConfigEntityZNode.path(rootEntityType, entityName), Some(entityName))
+    }.toSeq
+
+    val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+    getDataResponses.map { response =>
+      val entityName = response.ctx.get.asInstanceOf[String]
+      response.resultCode match {
+        case Code.OK =>
+          entityName -> ConfigEntityZNode.decode(response.data)
+        case Code.NONODE =>
+          entityName -> new Properties()
+        case _ => throw response.resultException.get
+      }
+    }.toMap
+  }
+
   /**
    * Sets or creates the entity znode path with the given configs depending
    * on whether it already exists or not.
@@ -1554,6 +1650,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
   }
 
+  def getOrCreateMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    val getDataRequest = GetDataRequest(MigrationZNode.path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    getDataResponse.resultCode match {
+      case Code.OK =>
+        MigrationZNode.decode(getDataResponse.data, getDataResponse.stat.getVersion, getDataResponse.stat.getMtime)
+      case Code.NONODE =>
+        createInitialMigrationState(initialState)
+      case _ => throw getDataResponse.resultException.get
+    }
+  }
+
+  def createInitialMigrationState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    val createRequest = CreateRequest(
+      MigrationZNode.path,
+      MigrationZNode.encode(initialState),
+      defaultAcls(MigrationZNode.path),
+      CreateMode.PERSISTENT)
+    val response = retryRequestUntilConnected(createRequest)
+    response.maybeThrow()
+    initialState.withMigrationZkVersion(0)
+  }
+
+  def updateMigrationState(migrationState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    val req = SetDataRequest(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
+    val resp = retryRequestUntilConnected(req)
+    resp.maybeThrow()
+    migrationState.withMigrationZkVersion(resp.stat.getVersion)
+  }
+
   /**
     * Return the ACLs of the node of the given path
     * @param path the given path for the node
@@ -1772,6 +1898,137 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
   }
 
+  /**
+   * Safely performs a sequence of writes to ZooKeeper as part of a KRaft migration. For each request in {@code requests}, we
+   * wrap the operation in a multi-op transaction that includes a check op on /controller_epoch and /migration. This ensures
+   * that another KRaft controller or another ZK controller has unexpectedly taken leadership.
+   *
+   * In cases of KRaft failover during a migration, it is possible that a write is attempted before the old KRaft controller
+   * receives the new leader information. In this case, the check op on /migration acts as a guard against multiple writers.
+   *
+   * The multi-op for the last request in {@code requests} is used to update the /migration node with the latest migration
+   * state. This effectively checkpoints the progress of the migration in ZK relative to the metadata log.
+   *
+   * Each multi-op request is atomic. The overall sequence of multi-op requests is not atomic and we may fail during any
+   * of them. When the KRaft controller recovers the migration state, it will re-apply all of the writes needed to update
+   * the ZK state with the latest KRaft state. In the case of Create or Delete operations, these will fail if applied
+   * twice, so we need to ignore NodeExists and NoNode failures for those cases.
+   *
+   * @param requests  A sequence of ZK requests. Only Create, Delete, and SetData are supported.
+   * @param migrationState The current migration state. This is written out as part of the final multi-op request.
+   * @return  The new version of /migration ZNode and the sequence of responses for the given requests.
+   */
+  def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req],
+                                                                migrationState: ZkMigrationLeadershipState): (Int, Seq[Req#Response]) = {
+
+    if (requests.isEmpty) {
+      return (migrationState.migrationZkVersion(), Seq.empty)
+    }
+
+    def wrapMigrationRequest(request: Req, lastRequestInBatch: Boolean): MultiRequest = {
+      // Wrap a single request with the multi-op transactional request.
+      val checkOp = CheckOp(ControllerEpochZNode.path, migrationState.controllerZkVersion())
+      val migrationOp = if (lastRequestInBatch) {
+        SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
+      } else {
+        CheckOp(MigrationZNode.path, migrationState.migrationZkVersion())
+      }
+
+      request match {
+        case CreateRequest(path, data, acl, createMode, ctx) =>
+          MultiRequest(Seq(checkOp, migrationOp, CreateOp(path, data, acl, createMode)), ctx)
+        case DeleteRequest(path, version, ctx) =>
+          MultiRequest(Seq(checkOp, migrationOp, DeleteOp(path, version)), ctx)
+        case SetDataRequest(path, data, version, ctx) =>
+          MultiRequest(Seq(checkOp, migrationOp, SetDataOp(path, data, version)), ctx)
+        case _ => throw new IllegalStateException(s"$request does not need controller epoch check")
+      }
+    }
+
+    def handleUnwrappedMigrationResult(migrationOp: ZkOp, migrationResult: OpResult): Int = {
+      // Handle just the operation that updated /migration ZNode
+      val (path: String, data: Option[Array[Byte]], version: Int) = migrationOp match {
+        case CheckOp(path, version) => (path, None, version)
+        case SetDataOp(path, data, version) => (path, Some(data), version)
+        case _ => throw new IllegalStateException("Unexpected result on /migration znode")
+      }
+
+      migrationResult match {
+        case _: CheckResult => version
+        case setDataResult: SetDataResult => setDataResult.getStat.getVersion
+        case errorResult: ErrorResult =>
+          if (path.equals(MigrationZNode.path)) {
+            val errorCode = Code.get(errorResult.getErr)
+            if (errorCode == Code.BADVERSION) {
+              data match {
+                case Some(value) =>
+                  val failedPayload = MigrationZNode.decode(value, version, -1)
+                  throw new RuntimeException(
+                    s"Conditional update on KRaft Migration ZNode failed. Expected zkVersion = ${version}. The failed " +
+                    s"write was: ${failedPayload}. This indicates that another KRaft controller is making writes to ZooKeeper.")
+                case None =>
+                  throw new RuntimeException(s"Check op on KRaft Migration ZNode failed. Expected zkVersion = ${version}. " +
+                    s"This indicates that another KRaft controller is making writes to ZooKeeper.")
+              }
+            } else if (errorCode == Code.OK) {
+              // This means the Check or SetData op would have been ok, but failed because of another operation in this multi-op
+              version
+            } else {
+              throw KeeperException.create(errorCode, path)
+            }
+          } else {
+            throw new RuntimeException(s"Got migration result for incorrect path $path")
+          }
+        case _ => throw new RuntimeException(
+          s"Expected either CheckResult, SetDataResult, or ErrorResult for migration op, but saw ${migrationResult}")
+      }
+    }
+
+    def unwrapMigrationResponse(response: AsyncResponse, lastRequestInBatch: Boolean): (AsyncResponse, Int) = {
+      response match {
+        case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) =>
+        zkOpResults match {
+          case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: CheckOp, migrationResult), zkOpResult) =>
+            // Matches all requests except or the last one (CheckOp on /migration)
+            if (lastRequestInBatch) {
+              throw new IllegalStateException("Should not see a Check operation on /migration in the last request.")
+            }
+            handleUnwrappedCheckOp(checkOp, checkOpResult)
+            val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
+            (handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
+          case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), ZkOpResult(migrationOp: SetDataOp, migrationResult), zkOpResult) =>
+            // Matches the last request in a batch (SetDataOp on /migration)
+            if (!lastRequestInBatch) {
+              throw new IllegalStateException("Should only see a SetData operation on /migration in the last request.")
+            }
+            handleUnwrappedCheckOp(checkOp, checkOpResult)
+            val migrationVersion = handleUnwrappedMigrationResult(migrationOp, migrationResult)
+            (handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata), migrationVersion)
+          case null => throw KeeperException.create(resultCode)
+          case _ => throw new IllegalStateException(
+            s"Cannot unwrap $response because it does not contain the expected operations for a migration operation.")
+        }
+        case _ => throw new IllegalStateException(s"Cannot unwrap $response because it is not a MultiResponse")
+      }
+    }
+
+    migrationState.controllerZkVersion() match {
+      case ZkVersion.MatchAnyVersion => throw new IllegalArgumentException(
+        s"Expected a controller epoch zkVersion when making migration writes, not -1.")
+      case version if version >= 0 =>
+        logger.trace(s"Performing ${requests.size} migration update(s) with migrationState=$migrationState")
+        val wrappedRequests = requests.map(req => wrapMigrationRequest(req, req == requests.last))
+        val results = retryRequestsUntilConnected(wrappedRequests)
+        val unwrappedResults = results.map(resp => unwrapMigrationResponse(resp, resp == results.last))
+        val migrationZkVersion = unwrappedResults.last._2
+        // Return the new version of /migration and the sequence of responses to the original requests
+        (migrationZkVersion, unwrappedResults.map(_._1.asInstanceOf[Req#Response]))
+      case invalidVersion =>
+        throw new IllegalArgumentException(
+          s"Expected controller epoch zkVersion $invalidVersion should be non-negative or equal to ${ZkVersion.MatchAnyVersion}")
+    }
+  }
+
   private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
     val remainingRequests = new mutable.ArrayBuffer(requests.size) ++= requests
     val responses = new mutable.ArrayBuffer[Req#Response]
@@ -1997,6 +2254,45 @@ object KafkaZkClient {
       }
   }
 
+  private def handleUnwrappedCheckOp(checkOp: CheckOp, checkOpResult: OpResult): Unit = {
+    checkOpResult match {
+      case errorResult: ErrorResult =>
+        if (checkOp.path.equals(ControllerEpochZNode.path)) {
+          val errorCode = Code.get(errorResult.getErr)
+          if (errorCode == Code.BADVERSION)
+          // Throw ControllerMovedException when the zkVersionCheck is performed on the controller epoch znode and the check fails
+            throw new ControllerMovedException(s"Controller epoch zkVersion check fails. Expected zkVersion = ${checkOp.version}")
+          else if (errorCode != Code.OK)
+            throw KeeperException.create(errorCode, checkOp.path)
+        }
+      case _ =>
+    }
+  }
+
+  private def handleUnwrappedZkOp(zkOpResult: ZkOpResult,
+                                  resultCode: Code,
+                                  ctx: Option[Any],
+                                  responseMetadata: ResponseMetadata): AsyncResponse = {
+    val rawOpResult = zkOpResult.rawOpResult
+    zkOpResult.zkOp match {
+      case createOp: CreateOp =>
+        val name = rawOpResult match {
+          case c: CreateResult => c.getPath
+          case _ => null
+        }
+        CreateResponse(resultCode, createOp.path, ctx, name, responseMetadata)
+      case deleteOp: DeleteOp =>
+        DeleteResponse(resultCode, deleteOp.path, ctx, responseMetadata)
+      case setDataOp: SetDataOp =>
+        val stat = rawOpResult match {
+          case s: SetDataResult => s.getStat
+          case _ => null
+        }
+        SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata)
+      case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp")
+    }
+  }
+
   // A helper function to transform a MultiResponse with the check on
   // controller epoch znode zkVersion back into a regular response.
   // ControllerMovedException will be thrown if the controller epoch
@@ -2006,37 +2302,10 @@ object KafkaZkClient {
     response match {
       case MultiResponse(resultCode, _, ctx, zkOpResults, responseMetadata) =>
         zkOpResults match {
+          // In normal ZK writes, we just have a MultiOp with a CheckOp and the actual operation we're performing
           case Seq(ZkOpResult(checkOp: CheckOp, checkOpResult), zkOpResult) =>
-            checkOpResult match {
-              case errorResult: ErrorResult =>
-                if (checkOp.path.equals(ControllerEpochZNode.path)) {
-                  val errorCode = Code.get(errorResult.getErr)
-                  if (errorCode == Code.BADVERSION)
-                  // Throw ControllerMovedException when the zkVersionCheck is performed on the controller epoch znode and the check fails
-                    throw new ControllerMovedException(s"Controller epoch zkVersion check fails. Expected zkVersion = ${checkOp.version}")
-                  else if (errorCode != Code.OK)
-                    throw KeeperException.create(errorCode, checkOp.path)
-                }
-              case _ =>
-            }
-            val rawOpResult = zkOpResult.rawOpResult
-            zkOpResult.zkOp match {
-              case createOp: CreateOp =>
-                val name = rawOpResult match {
-                  case c: CreateResult => c.getPath
-                  case _ => null
-                }
-                CreateResponse(resultCode, createOp.path, ctx, name, responseMetadata)
-              case deleteOp: DeleteOp =>
-                DeleteResponse(resultCode, deleteOp.path, ctx, responseMetadata)
-              case setDataOp: SetDataOp =>
-                val stat = rawOpResult match {
-                  case s: SetDataResult => s.getStat
-                  case _ => null
-                }
-                SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata)
-              case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp")
-            }
+            handleUnwrappedCheckOp(checkOp, checkOpResult)
+            handleUnwrappedZkOp(zkOpResult, resultCode, ctx, responseMetadata)
           case null => throw KeeperException.create(resultCode)
           case _ => throw new IllegalStateException(s"Cannot unwrap $response because the first zookeeper op is not check op in original MultiRequest")
         }
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 6bd3b19d7dc..84b767d4d3b 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -1048,7 +1048,14 @@ object MigrationZNode {
       val controllerEpoch = js("kraft_controller_epoch").to[Int]
       val metadataOffset = js("kraft_metadata_offset").to[Long]
       val metadataEpoch = js("kraft_metadata_epoch").to[Int]
-      Some(new ZkMigrationLeadershipState(controllerId, controllerEpoch, metadataOffset, metadataEpoch, modifyTimeMs, zkVersion, -2))
+      Some(new ZkMigrationLeadershipState(
+        controllerId,
+        controllerEpoch,
+        metadataOffset,
+        metadataEpoch,
+        modifyTimeMs,
+        zkVersion,
+        ZkVersion.UnknownVersion))
     }.getOrElse(throw new KafkaException(s"Failed to parse the migration json $jsonDataAsString"))
   }
 }
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
new file mode 100644
index 00000000000..77f46b9c794
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -0,0 +1,441 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
+import org.apache.kafka.metadata.migration.{MigrationClient, ZkMigrationLeadershipState}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, KeeperException}
+
+import java.util
+import java.util.Properties
+import java.util.function.Consumer
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+
+class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Logging {
+
+  override def getOrCreateMigrationRecoveryState(initialState: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    zkClient.createTopLevelPaths()
+    zkClient.getOrCreateMigrationState(initialState)
+  }
+
+  override def setMigrationRecoveryState(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    zkClient.updateMigrationState(state)
+  }
+
+  override def claimControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    val epochZkVersionOpt = zkClient.tryRegisterKRaftControllerAsActiveController(
+      state.kraftControllerId(), state.kraftControllerEpoch())
+    if (epochZkVersionOpt.isDefined) {
+      state.withControllerZkVersion(epochZkVersionOpt.get)
+    } else {
+      state.withControllerZkVersion(-1)
+    }
+  }
+
+  override def releaseControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    try {
+      zkClient.deleteController(state.controllerZkVersion())
+      state.withControllerZkVersion(-1)
+    } catch {
+      case _: ControllerMovedException =>
+        // If the controller moved, no need to release
+        state.withControllerZkVersion(-1)
+      case t: Throwable =>
+        throw new RuntimeException("Could not release controller leadership due to underlying error", t)
+    }
+  }
+
+  def migrateTopics(metadataVersion: MetadataVersion,
+                    recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+                    brokerIdConsumer: Consumer[Integer]): Unit = {
+    val topics = zkClient.getAllTopicsInCluster()
+    val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
+    val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
+      val partitions = partitionAssignments.keys.toSeq
+      val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
+      val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+      topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+        .setName(topic)
+        .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+
+      partitionAssignments.foreach { case (topicPartition, replicaAssignment) =>
+        replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
+        replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
+        val replicaList = replicaAssignment.replicas.map(Integer.valueOf).asJava
+        val record = new PartitionRecord()
+          .setTopicId(topicIdOpt.get)
+          .setPartitionId(topicPartition.partition)
+          .setReplicas(replicaList)
+          .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
+          .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
+        leaderIsrAndControllerEpochs.get(topicPartition) match {
+          case Some(leaderIsrAndEpoch) => record
+              .setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+              .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
+              .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
+              .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
+              .setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value())
+          case None =>
+            warn(s"Could not find partition state in ZK for $topicPartition. Initializing this partition " +
+              s"with ISR={$replicaList} and leaderEpoch=0.")
+            record
+              .setIsr(replicaList)
+              .setLeader(replicaList.get(0))
+              .setLeaderEpoch(0)
+              .setPartitionEpoch(0)
+              .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
+        }
+        topicBatch.add(new ApiMessageAndVersion(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      val props = topicConfigs(topic)
+      props.forEach { case (key: Object, value: Object) =>
+        topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.TOPIC.id)
+          .setResourceName(topic)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+      recordConsumer.accept(topicBatch)
+    }
+  }
+
+  def migrateBrokerConfigs(metadataVersion: MetadataVersion,
+                           recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+    val batch = new util.ArrayList[ApiMessageAndVersion]()
+    zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
+      val brokerResource = if (broker == ConfigEntityName.Default) {
+        ""
+      } else {
+        broker
+      }
+      props.forEach { case (key: Object, value: Object) =>
+        batch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.BROKER.id)
+          .setResourceName(brokerResource)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+    }
+    if (!batch.isEmpty) {
+      recordConsumer.accept(batch)
+    }
+  }
+
+  def migrateClientQuotas(metadataVersion: MetadataVersion,
+                          recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val adminZkClient = new AdminZkClient(zkClient)
+
+    def migrateEntityType(entityType: String): Unit = {
+      adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
+        val entity = new EntityData().setEntityType(entityType).setEntityName(name)
+        val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+          batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+            .setEntity(List(entity).asJava)
+            .setKey(key)
+            .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+        }
+        recordConsumer.accept(batch)
+      }
+    }
+
+    migrateEntityType(ConfigType.User)
+    migrateEntityType(ConfigType.Client)
+    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
+      // Taken from ZkAdminManager
+      val components = name.split("/")
+      if (components.size != 3 || components(1) != "clients")
+        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+      val entity = List(
+        new EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+        new EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+      )
+
+      val batch = new util.ArrayList[ApiMessageAndVersion]()
+      ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+        batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+          .setEntity(entity.asJava)
+          .setKey(key)
+          .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+      recordConsumer.accept(batch)
+    }
+
+    migrateEntityType(ConfigType.Ip)
+  }
+
+  def migrateProducerId(metadataVersion: MetadataVersion,
+                        recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+    dataOpt match {
+      case Some(data) =>
+        val producerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+        recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord()
+          .setBrokerEpoch(-1)
+          .setBrokerId(producerIdBlock.assignedBrokerId)
+          .setNextProducerId(producerIdBlock.firstProducerId), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+      case None => // Nothing to migrate
+    }
+  }
+
+  override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]],
+                               brokerIdConsumer: Consumer[Integer]): Unit = {
+    migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
+    migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
+    migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
+    migrateProducerId(MetadataVersion.latest(), batchConsumer)
+  }
+
+  override def readBrokerIds(): util.Set[Integer] = {
+    zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
+  }
+
+  override def readBrokerIdsFromTopicAssignments(): util.Set[Integer] = {
+    val topics = zkClient.getAllTopicsInCluster()
+    val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+    val brokersWithAssignments = new util.HashSet[Integer]()
+    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) =>
+      assignments.values.foreach { assignment =>
+        assignment.replicas.foreach { brokerId => brokersWithAssignments.add(brokerId) }
+      }
+    }
+    brokersWithAssignments
+  }
+
+  override def createTopic(topicName: String,
+                           topicId: Uuid,
+                           partitions: util.Map[Integer, PartitionRegistration],
+                           state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    val assignments = partitions.asScala.map { case (partitionId, partition) =>
+      new TopicPartition(topicName, partitionId) ->
+        ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
+    }
+
+    val createTopicZNode = {
+      val path = TopicZNode.path(topicName)
+      CreateRequest(
+        path,
+        TopicZNode.encode(Some(topicId), assignments),
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+    val createPartitionsZNode = {
+      val path = TopicPartitionsZNode.path(topicName)
+      CreateRequest(
+        path,
+        null,
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+
+    val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) =>
+      val topicPartition = new TopicPartition(topicName, partitionId)
+      Seq(
+        createTopicPartition(topicPartition),
+        createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
+      )
+    }
+
+    val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
+    val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
+    state.withMigrationZkVersion(migrationZkVersion)
+  }
+
+  private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = {
+    val path = TopicPartitionZNode.path(topicPartition)
+    CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def partitionStatePathAndData(topicPartition: TopicPartition,
+                                        partitionRegistration: PartitionRegistration,
+                                        controllerEpoch: Int): (String, Array[Byte]) = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    (path, data)
+  }
+
+  private def createTopicPartitionState(topicPartition: TopicPartition,
+                                        partitionRegistration: PartitionRegistration,
+                                        controllerEpoch: Int): CreateRequest = {
+    val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
+    CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def updateTopicPartitionState(topicPartition: TopicPartition,
+                                        partitionRegistration: PartitionRegistration,
+                                        controllerEpoch: Int): SetDataRequest = {
+    val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch)
+    SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
+  }
+
+  override def updateTopicPartitions(topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
+                                     state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) =>
+      partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) =>
+        val topicPartition = new TopicPartition(topicName, partitionId)
+        Seq(updateTopicPartitionState(topicPartition, partitionRegistration, state.kraftControllerEpoch()))
+      }
+    }
+    if (requests.isEmpty) {
+      state
+    } else {
+      val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state)
+      state.withMigrationZkVersion(migrationZkVersion)
+    }
+  }
+
+  // Try to update an entity config and the migration state. If NoNode is encountered, it probably means we
+  // need to recursively create the parent ZNode. In this case, return None.
+  def tryWriteEntityConfig(entityType: String,
+                           path: String,
+                           props: Properties,
+                           create: Boolean,
+                           state: ZkMigrationLeadershipState): Option[ZkMigrationLeadershipState] = {
+    val configData = ConfigEntityZNode.encode(props)
+
+    val requests = if (create) {
+      Seq(CreateRequest(ConfigEntityZNode.path(entityType, path), configData, zkClient.defaultAcls(path), CreateMode.PERSISTENT))
+    } else {
+      Seq(SetDataRequest(ConfigEntityZNode.path(entityType, path), configData, ZkVersion.MatchAnyVersion))
+    }
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state)
+    if (!create && responses.head.resultCode.equals(Code.NONODE)) {
+      // Not fatal. Just means we need to Create this node instead of SetData
+      None
+    } else if (responses.head.resultCode.equals(Code.OK)) {
+      Some(state.withMigrationZkVersion(migrationZkVersion))
+    } else {
+      throw KeeperException.create(responses.head.resultCode, path)
+    }
+  }
+
+  def writeClientQuotas(entity: ClientQuotaEntity,
+                        quotas: util.Map[String, Double],
+                        state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    val entityMap = entity.entries().asScala
+    val hasUser = entityMap.contains(ConfigType.User)
+    val hasClient = entityMap.contains(ConfigType.Client)
+    val hasIp = entityMap.contains(ConfigType.Ip)
+    val props = new Properties()
+    // We store client quota values as strings in the ZK JSON
+    quotas.forEach { case (key, value) => props.put(key, value.toString) }
+    val (configType, path) = if (hasUser && !hasClient) {
+      (Some(ConfigType.User), Some(entityMap(ConfigType.User)))
+    } else if (hasUser && hasClient) {
+      (Some(ConfigType.User), Some(s"${entityMap(ConfigType.User)}/clients/${entityMap(ConfigType.Client)}"))
+    } else if (hasClient) {
+      (Some(ConfigType.Client), Some(entityMap(ConfigType.Client)))
+    } else if (hasIp) {
+      (Some(ConfigType.Ip), Some(entityMap(ConfigType.Ip)))
+    } else {
+      (None, None)
+    }
+
+    if (path.isEmpty) {
+      error(s"Skipping unknown client quota entity $entity")
+      return state
+    }
+
+    // Try to write the client quota configs once with create=false, and again with create=true if the first operation fails
+    tryWriteEntityConfig(configType.get, path.get, props, create=false, state) match {
+      case Some(newState) =>
+        newState
+      case None =>
+        // If we didn't update the migration state, we failed to write the client quota. Try again
+        // after recursively create its parent znodes
+        val createPath = if (hasUser && hasClient) {
+          s"${ConfigEntityTypeZNode.path(configType.get)}/${entityMap(ConfigType.User)}/clients"
+        } else {
+          ConfigEntityTypeZNode.path(configType.get)
+        }
+        zkClient.createRecursive(createPath, throwIfPathExists=false)
+        debug(s"Recursively creating ZNode $createPath and attempting to write $entity quotas a second time.")
+
+        tryWriteEntityConfig(configType.get, path.get, props, create=true, state) match {
+          case Some(newStateSecondTry) => newStateSecondTry
+          case None => throw new RuntimeException(
+            s"Could not write client quotas for $entity on second attempt when using Create instead of SetData")
+        }
+    }
+  }
+
+  def writeProducerId(nextProducerId: Long, state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(
+      new ProducerIdsBlock(-1, nextProducerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE))
+
+    val request = SetDataRequest(ProducerIdBlockZNode.path, newProducerIdBlockData, ZkVersion.MatchAnyVersion)
+    val (migrationZkVersion, _) = zkClient.retryMigrationRequestsUntilConnected(Seq(request), state)
+    state.withMigrationZkVersion(migrationZkVersion)
+  }
+
+  def writeConfigs(resource: ConfigResource,
+                   configs: util.Map[String, String],
+                   state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+    val configType = resource.`type`() match {
+      case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
+      case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
+      case _ => None
+    }
+
+    val configName = resource.name()
+    if (configType.isDefined) {
+      val props = new Properties()
+      configs.forEach { case (key, value) => props.put(key, value) }
+      tryWriteEntityConfig(configType.get, configName, props, create=false, state) match {
+        case Some(newState) =>
+          newState
+        case None =>
+          val createPath = ConfigEntityTypeZNode.path(configType.get)
+          debug(s"Recursively creating ZNode $createPath and attempting to write $resource configs a second time.")
+          zkClient.createRecursive(createPath, throwIfPathExists=false)
+
+          tryWriteEntityConfig(configType.get, configName, props, create=true, state) match {
+            case Some(newStateSecondTry) => newStateSecondTry
+            case None => throw new RuntimeException(
+              s"Could not write ${configType.get} configs on second attempt when using Create instead of SetData.")
+          }
+      }
+    } else {
+      debug(s"Not updating ZK for $resource since it is not a Broker or Topic entity.")
+      state
+    }
+  }
+}
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
new file mode 100644
index 00000000000..54b1156ccb1
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.util
+import java.util.concurrent.TimeUnit
+import scala.jdk.CollectionConverters._
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class ZkMigrationIntegrationTest {
+
+  class MetadataDeltaVerifier {
+    val metadataDelta = new MetadataDelta(MetadataImage.EMPTY)
+    var offset = 0
+    def accept(batch: java.util.List[ApiMessageAndVersion]): Unit = {
+      batch.forEach(message => {
+        metadataDelta.replay(offset, 0, message.message())
+        offset += 1
+      })
+    }
+
+    def verify(verifier: MetadataImage => Unit): Unit = {
+      val image = metadataDelta.apply()
+      verifier.apply(image)
+    }
+  }
+
+  @ClusterTest(brokers = 3, clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_4_IV0)
+  def testMigrate(clusterInstance: ClusterInstance): Unit = {
+    val admin = clusterInstance.createAdminClient()
+    val newTopics = new util.ArrayList[NewTopic]()
+    newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort)
+      .configs(Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "102400", TopicConfig.SEGMENT_MS_CONFIG -> "300000").asJava))
+    newTopics.add(new NewTopic("test-topic-2", 1, 3.toShort))
+    newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
+    val createTopicResult = admin.createTopics(newTopics)
+    createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+    val quotas = new util.ArrayList[ClientQuotaAlteration]()
+    quotas.add(new ClientQuotaAlteration(
+      new ClientQuotaEntity(Map("user" -> "user1").asJava),
+      List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
+    quotas.add(new ClientQuotaAlteration(
+      new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
+      List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
+    quotas.add(new ClientQuotaAlteration(
+      new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
+      List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
+    admin.alterClientQuotas(quotas)
+
+    val zkClient = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+    val migrationClient = new ZkMigrationClient(zkClient)
+    var migrationState = migrationClient.getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY)
+    migrationState = migrationState.withNewKRaftController(3000, 42)
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+
+    val brokers = new java.util.HashSet[Integer]()
+    val verifier = new MetadataDeltaVerifier()
+    migrationClient.readAllMetadata(batch => verifier.accept(batch), brokerId => brokers.add(brokerId))
+    assertEquals(Seq(0, 1, 2), brokers.asScala.toSeq)
+
+    verifier.verify { image =>
+      assertNotNull(image.topics().getTopic("test-topic-1"))
+      assertEquals(2, image.topics().getTopic("test-topic-1").partitions().size())
+
+      assertNotNull(image.topics().getTopic("test-topic-2"))
+      assertEquals(1, image.topics().getTopic("test-topic-2").partitions().size())
+
+      assertNotNull(image.topics().getTopic("test-topic-3"))
+      assertEquals(10, image.topics().getTopic("test-topic-3").partitions().size())
+
+      val clientQuotas = image.clientQuotas().entities()
+      assertEquals(3, clientQuotas.size())
+    }
+
+    migrationState = migrationClient.releaseControllerLeadership(migrationState)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/ZkAdminManagerTest.scala b/core/src/test/scala/unit/kafka/server/ZkAdminManagerTest.scala
index 2057fda2c52..4fe64622d79 100644
--- a/core/src/test/scala/unit/kafka/server/ZkAdminManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ZkAdminManagerTest.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.util.Properties
-
 import kafka.server.metadata.ZkConfigRepository
 import kafka.utils.TestUtils
 import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -28,10 +27,7 @@ import org.apache.kafka.common.message.DescribeConfigsRequestData
 import org.apache.kafka.common.message.DescribeConfigsResponseData
 import org.apache.kafka.common.protocol.Errors
 import org.junit.jupiter.api.{AfterEach, Test}
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertNotNull
-import org.junit.jupiter.api.Assertions.assertNotEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertThrows}
 import org.mockito.Mockito.{mock, when}
 
 import scala.jdk.CollectionConverters._
@@ -54,6 +50,28 @@ class ZkAdminManagerTest {
     new ConfigHelper(metadataCache, KafkaConfig.fromProps(props), new ZkConfigRepository(new AdminZkClient(zkClient)))
   }
 
+  @Test
+  def testClientQuotasToProps(): Unit = {
+    val emptyProps = ZkAdminManager.clientQuotaPropsToDoubleMap(Map.empty)
+    assertEquals(0, emptyProps.size)
+
+    val oneProp = ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> "1234"))
+    assertEquals(1, oneProp.size)
+    assertEquals(1234.0, oneProp("foo"))
+
+    // This is probably not desired, but kept for compatability with existing usages
+    val emptyKey = ZkAdminManager.clientQuotaPropsToDoubleMap(Map("" -> "-42.1"))
+    assertEquals(1, emptyKey.size)
+    assertEquals(-42.1, emptyKey(""))
+
+    val manyProps = ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> "1234", "bar" -> "0", "spam" -> "-1234.56"))
+    assertEquals(3, manyProps.size)
+
+    assertThrows(classOf[NullPointerException], () => ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> null)))
+    assertThrows(classOf[IllegalStateException], () => ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> "bar")))
+    assertThrows(classOf[IllegalStateException], () => ZkAdminManager.clientQuotaPropsToDoubleMap(Map("foo" -> "")))
+  }
+
   @Test
   def testDescribeConfigsWithNullConfigurationKeys(): Unit = {
     when(zkClient.getEntityConfigs(ConfigType.Topic, topic)).thenReturn(TestUtils.createBrokerConfig(brokerId, "zk"))
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 7b7ddfbc56f..11eae3386f8 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,7 +19,6 @@ package kafka.zk
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.{Collections, Properties}
-
 import kafka.api.LeaderAndIsr
 import kafka.cluster.{Broker, EndPoint}
 import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
@@ -43,9 +42,10 @@ import org.apache.kafka.common.security.token.delegation.TokenInformation
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.zookeeper.KeeperException.{Code, NoAuthException, NoNodeException, NodeExistsException}
-import org.apache.zookeeper.ZooDefs
+import org.apache.zookeeper.{CreateMode, ZooDefs}
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.common.ZKConfig
 import org.apache.zookeeper.data.Stat
@@ -1373,6 +1373,53 @@ class KafkaZkClientTest extends QuorumTestHarness {
     } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
   }
 
+  @Test
+  def testFailToUpdateMigrationZNode(): Unit = {
+    val (_, stat) = zkClient.getControllerEpoch.get
+    var migrationState = new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, stat.getVersion)
+    migrationState = zkClient.getOrCreateMigrationState(migrationState)
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    // A batch of migration writes to make. The last one will fail causing the migration znode to not be updated
+    val requests_bad = Seq(
+      CreateRequest("/foo", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+      CreateRequest("/foo/bar", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+      CreateRequest("/foo/bar/spam", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+      CreateRequest("/foo", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+    )
+
+    migrationState = migrationState.withControllerZkVersion(stat.getVersion)
+    zkClient.retryMigrationRequestsUntilConnected(requests_bad, migrationState) match {
+      case (zkVersion: Int, requests: Seq[AsyncRequest#Response]) =>
+        assertEquals(0, zkVersion)
+        assert(requests.take(3).forall(resp => resp.resultCode.equals(Code.OK)))
+        assertEquals(Code.NODEEXISTS, requests.last.resultCode)
+      case _ => fail()
+    }
+
+    // Check state again
+    val loadedState = zkClient.getOrCreateMigrationState(ZkMigrationLeadershipState.EMPTY)
+    assertEquals(0, loadedState.migrationZkVersion())
+
+    // Resend the same requests, with the last one succeeding this time. This will result in NODEEXISTS, but
+    // should still update the migration state
+    val requests_good = Seq(
+      CreateRequest("/foo", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+      CreateRequest("/foo/bar", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+      CreateRequest("/foo/bar/spam", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+      CreateRequest("/foo/bar/eggs", Array(), zkClient.defaultAcls("/foo"), CreateMode.PERSISTENT),
+    )
+
+    migrationState = migrationState.withControllerZkVersion(stat.getVersion)
+    zkClient.retryMigrationRequestsUntilConnected(requests_good, migrationState) match {
+      case (zkVersion: Int, requests: Seq[AsyncRequest#Response]) =>
+        assertEquals(1, zkVersion)
+        assert(requests.take(3).forall(resp => resp.resultCode.equals(Code.NODEEXISTS)))
+        assertEquals(Code.OK, requests.last.resultCode)
+      case _ => fail()
+    }
+  }
+
   class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time)
     extends KafkaZkClient(zooKeeperClient, isSecure, time) {
     // Overwriting this method from the parent class to force the client to re-register the Broker.
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
new file mode 100644
index 00000000000..7fae24f650e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.coordinator.transaction.ProducerIdManager
+import kafka.server.{ConfigType, QuorumTestHarness, ZkAdminManager}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.config.internals.QuotaConfigs
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, ProducerIdsRecord}
+import org.apache.kafka.common.quota.ClientQuotaEntity
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
+import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
+import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+
+import java.util.Properties
+import scala.collection.Map
+import scala.jdk.CollectionConverters._
+
+/**
+ * ZooKeeper integration tests that verify the interoperability of KafkaZkClient and ZkMigrationClient.
+ */
+class ZkMigrationClientTest extends QuorumTestHarness {
+
+  private var migrationClient: ZkMigrationClient = _
+
+  private var migrationState: ZkMigrationLeadershipState = _
+
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
+    zkClient.createControllerEpochRaw(1)
+
+    migrationClient = new ZkMigrationClient(zkClient)
+    migrationState = initialMigrationState
+    migrationState = migrationClient.getOrCreateMigrationRecoveryState(migrationState)
+   }
+
+  private def initialMigrationState: ZkMigrationLeadershipState = {
+    val (_, stat) = zkClient.getControllerEpoch.get
+    new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, stat.getVersion)
+  }
+
+  @Test
+  def testMigrateEmptyZk(): Unit = {
+    val brokers = new java.util.ArrayList[Integer]()
+    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+
+    migrationClient.readAllMetadata(batch => batches.add(batch), brokerId => brokers.add(brokerId))
+    assertEquals(0, brokers.size())
+    assertEquals(0, batches.size())
+  }
+
+  @Test
+  def testEmptyWrite(): Unit = {
+    val (zkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(), migrationState)
+    assertEquals(migrationState.migrationZkVersion(), zkVersion)
+    assertTrue(responses.isEmpty)
+  }
+
+  @Test
+  def testUpdateExistingPartitions(): Unit = {
+    // Create a topic and partition state in ZK like KafkaController would
+    val assignment = Map(
+      new TopicPartition("test", 0) -> List(0, 1, 2),
+      new TopicPartition("test", 1) -> List(1, 2, 3)
+    )
+    zkClient.createTopicAssignment("test", Some(Uuid.randomUuid()), assignment)
+
+    val leaderAndIsrs = Map(
+      new TopicPartition("test", 0) -> LeaderIsrAndControllerEpoch(
+        new LeaderAndIsr(0, 5, List(0, 1, 2), LeaderRecoveryState.RECOVERED, -1), 1),
+      new TopicPartition("test", 1) -> LeaderIsrAndControllerEpoch(
+        new LeaderAndIsr(1, 5, List(1, 2, 3), LeaderRecoveryState.RECOVERED, -1), 1)
+    )
+    zkClient.createTopicPartitionStatesRaw(leaderAndIsrs, 0)
+
+    // Now verify that we can update it with migration client
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    val partitions = Map(
+      0 -> new PartitionRegistration(Array(0, 1, 2), Array(1, 2), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 6, -1),
+      1 -> new PartitionRegistration(Array(1, 2, 3), Array(3), Array(), Array(), 3, LeaderRecoveryState.RECOVERED, 7, -1)
+    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+    migrationState = migrationClient.updateTopicPartitions(Map("test" -> partitions).asJava, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    // Read back with Zk client
+    val partition0 = zkClient.getTopicPartitionState(new TopicPartition("test", 0)).get.leaderAndIsr
+    assertEquals(1, partition0.leader)
+    assertEquals(6, partition0.leaderEpoch)
+    assertEquals(List(1, 2), partition0.isr)
+
+    val partition1 = zkClient.getTopicPartitionState(new TopicPartition("test", 1)).get.leaderAndIsr
+    assertEquals(3, partition1.leader)
+    assertEquals(7, partition1.leaderEpoch)
+    assertEquals(List(3), partition1.isr)
+  }
+
+  @Test
+  def testCreateNewPartitions(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    val partitions = Map(
+      0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
+      1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+    migrationState = migrationClient.createTopic("test", Uuid.randomUuid(), partitions, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    // Read back with Zk client
+    val partition0 = zkClient.getTopicPartitionState(new TopicPartition("test", 0)).get.leaderAndIsr
+    assertEquals(0, partition0.leader)
+    assertEquals(0, partition0.leaderEpoch)
+    assertEquals(List(0, 1, 2), partition0.isr)
+
+    val partition1 = zkClient.getTopicPartitionState(new TopicPartition("test", 1)).get.leaderAndIsr
+    assertEquals(1, partition1.leader)
+    assertEquals(0, partition1.leaderEpoch)
+    assertEquals(List(1, 2, 3), partition1.isr)
+  }
+
+  // Write Client Quotas using ZkMigrationClient and read them back using AdminZkClient
+  private def writeClientQuotaAndVerify(migrationClient: ZkMigrationClient,
+                                        adminZkClient: AdminZkClient,
+                                        migrationState: ZkMigrationLeadershipState,
+                                        entity: Map[String, String],
+                                        quotas: Map[String, Double],
+                                        zkEntityType: String,
+                                        zkEntityName: String): ZkMigrationLeadershipState = {
+    val nextMigrationState = migrationClient.writeClientQuotas(
+      new ClientQuotaEntity(entity.asJava),
+      quotas.asJava,
+      migrationState)
+    val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
+      adminZkClient.fetchEntityConfig(zkEntityType, zkEntityName).asScala)
+    assertEquals(quotas, newProps)
+    nextMigrationState
+  }
+
+
+  @Test
+  def testWriteExistingClientQuotas(): Unit = {
+    val props = new Properties()
+    props.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "100000")
+    adminZkClient.changeConfigs(ConfigType.User, "user1", props)
+    adminZkClient.changeConfigs(ConfigType.User, "user1/clients/clientA", props)
+
+    assertEquals(0, migrationState.migrationZkVersion())
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ConfigType.User -> "user1"),
+      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
+      ConfigType.User, "user1")
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ConfigType.User -> "user1"),
+      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
+      ConfigType.User, "user1")
+    assertEquals(2, migrationState.migrationZkVersion())
+
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ConfigType.User -> "user1"),
+      Map.empty,
+      ConfigType.User, "user1")
+    assertEquals(3, migrationState.migrationZkVersion())
+
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ConfigType.User -> "user1"),
+      Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
+      ConfigType.User, "user1")
+    assertEquals(4, migrationState.migrationZkVersion())
+  }
+
+  @Test
+  def testWriteNewClientQuotas(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ConfigType.User -> "user2"),
+      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
+      ConfigType.User, "user2")
+
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
+      Map(ConfigType.User -> "user2", ConfigType.Client -> "clientA"),
+      Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
+      ConfigType.User, "user2/clients/clientA")
+
+    assertEquals(2, migrationState.migrationZkVersion())
+  }
+
+  @Test
+  def testClaimAbsentController(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(1, migrationState.controllerZkVersion())
+  }
+
+  @Test
+  def testExistingKRaftControllerClaim(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(1, migrationState.controllerZkVersion())
+
+    // We don't require a KRaft controller to release the controller in ZK before another KRaft controller
+    // can claim it. This is because KRaft leadership comes from Raft and we are just synchronizing it to ZK.
+    var otherNodeState = new ZkMigrationLeadershipState(3001, 43, 100, 42, Time.SYSTEM.milliseconds(), -1, -1)
+    otherNodeState = migrationClient.claimControllerLeadership(otherNodeState)
+    assertEquals(2, otherNodeState.controllerZkVersion())
+    assertEquals(3001, otherNodeState.kraftControllerId())
+    assertEquals(43, otherNodeState.kraftControllerEpoch())
+  }
+
+  @Test
+  def testNonIncreasingKRaftEpoch(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(1, migrationState.controllerZkVersion())
+
+    migrationState = migrationState.withNewKRaftController(3000, 40)
+    val t1 = assertThrows(classOf[IllegalStateException], () => migrationClient.claimControllerLeadership(migrationState))
+    assertEquals("Cannot register KRaft controller 3000 as the active controller in ZK since its epoch 10000040 is not higher than the current ZK epoch 10000042.", t1.getMessage)
+
+    migrationState = migrationState.withNewKRaftController(3000, 42)
+    val t2 = assertThrows(classOf[IllegalStateException], () => migrationClient.claimControllerLeadership(migrationState))
+    assertEquals("Cannot register KRaft controller 3000 as the active controller in ZK since its epoch 10000042 is not higher than the current ZK epoch 10000042.", t2.getMessage)
+  }
+
+  @Test
+  def testClaimAndReleaseExistingController(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    val (epoch, zkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(100)
+    assertEquals(epoch, 2)
+    assertEquals(zkVersion, 1)
+
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(2, migrationState.controllerZkVersion())
+    zkClient.getControllerEpoch match {
+      case Some((kraftEpoch, stat)) =>
+        assertEquals(10000042, kraftEpoch)
+        assertEquals(2, stat.getVersion)
+      case None => fail()
+    }
+    assertEquals(3000, zkClient.getControllerId.get)
+    assertThrows(classOf[ControllerMovedException], () => zkClient.registerControllerAndIncrementControllerEpoch(100))
+
+    migrationState = migrationClient.releaseControllerLeadership(migrationState)
+    val (epoch1, zkVersion1) = zkClient.registerControllerAndIncrementControllerEpoch(100)
+    assertEquals(epoch1, 10000043)
+    assertEquals(zkVersion1, 3)
+  }
+
+  @Test
+  def testReadAndWriteProducerId(): Unit = {
+    def generateNextProducerIdWithZkAndRead(): Long = {
+      // Generate a producer ID in ZK
+      val manager = ProducerIdManager.zk(1, zkClient)
+      manager.generateProducerId()
+
+      val records = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+      migrationClient.migrateProducerId(MetadataVersion.latest(), batch => records.add(batch))
+      assertEquals(1, records.size())
+      assertEquals(1, records.get(0).size())
+
+      val record = records.get(0).get(0).message().asInstanceOf[ProducerIdsRecord]
+      record.nextProducerId()
+    }
+
+    // Initialize with ZK ProducerIdManager
+    assertEquals(0, generateNextProducerIdWithZkAndRead())
+
+    // Update next producer ID via migration client
+    migrationState = migrationClient.writeProducerId(6000, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    // Switch back to ZK, it should provision the next block
+    assertEquals(7000, generateNextProducerIdWithZkAndRead())
+  }
+
+  @Test
+  def testMigrateTopicConfigs(): Unit = {
+    val props = new Properties()
+    props.put(TopicConfig.FLUSH_MS_CONFIG, "60000")
+    props.put(TopicConfig.RETENTION_MS_CONFIG, "300000")
+    adminZkClient.createTopicWithAssignment("test", props, Map(0 -> Seq(0, 1, 2), 1 -> Seq(1, 2, 0), 2 -> Seq(2, 0, 1)), usesTopicId = true)
+
+    val brokers = new java.util.ArrayList[Integer]()
+    val batches = new java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+    migrationClient.migrateTopics(MetadataVersion.latest(), batch => batches.add(batch), brokerId => brokers.add(brokerId))
+    assertEquals(1, batches.size())
+    val configs = batches.get(0)
+      .asScala
+      .map {_.message()}
+      .filter(message => MetadataRecordType.fromId(message.apiKey()).equals(MetadataRecordType.CONFIG_RECORD))
+      .map {_.asInstanceOf[ConfigRecord]}
+      .toSeq
+    assertEquals(2, configs.size)
+    assertEquals(TopicConfig.FLUSH_MS_CONFIG, configs.head.name())
+    assertEquals("60000", configs.head.value())
+    assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name())
+    assertEquals("300000", configs.last.value())
+  }
+
+  @Test
+  def testWriteNewTopicConfigs(): Unit = {
+    migrationState = migrationClient.writeConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "test"),
+      java.util.Collections.singletonMap(TopicConfig.SEGMENT_MS_CONFIG, "100000"), migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    val newProps = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+    assertEquals(1, newProps.size())
+    assertEquals("100000", newProps.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
+  }
+
+  @Test
+  def testWriteExistingTopicConfigs(): Unit = {
+    val props = new Properties()
+    props.put(TopicConfig.FLUSH_MS_CONFIG, "60000")
+    props.put(TopicConfig.RETENTION_MS_CONFIG, "300000")
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, "test", props)
+
+    migrationState = migrationClient.writeConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "test"),
+      java.util.Collections.singletonMap(TopicConfig.SEGMENT_MS_CONFIG, "100000"), migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    val newProps = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+    assertEquals(1, newProps.size())
+    assertEquals("100000", newProps.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
+  }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
index bbdc1bf9779..3eb3e3c4e95 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
@@ -62,7 +62,8 @@ public final class ClientQuotasImage {
         return entities.isEmpty();
     }
 
-    Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+    // Visible for testing
+    public Map<ClientQuotaEntity, ClientQuotaImage> entities() {
         return entities;
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
new file mode 100644
index 00000000000..5eccbc70625
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.migration;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * Methods for interacting with ZooKeeper during a KIP-866 migration. The migration leadership state is stored under
+ * a ZNode /migration. All write operations to ZK during the migration are performed as a multi-op transaction which
+ * also updates the state of /migration.
+ */
+public interface MigrationClient {
+
+    /**
+     * Read or initialize the ZK migration leader state in ZK. If the ZNode is absent, the given {@code initialState}
+     * will be written and subsequently returned with the zkVersion of the node. If the ZNode is present, it will be
+     * read and returned.
+     * @param initialState  An initial, emtpy, state to write to ZooKeeper for the migration state.
+     * @return  The existing migration state, or the initial state given.
+     */
+    ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState);
+
+    /**
+     * Overwrite the migration state in ZK. This is done as a conditional update using
+     * {@link ZkMigrationLeadershipState#migrationZkVersion()}. If the conditional update fails, an exception is thrown.
+     * @param state The migration state to persist
+     * @return  The persisted migration state or an exception.
+     */
+    ZkMigrationLeadershipState setMigrationRecoveryState(ZkMigrationLeadershipState state);
+
+    /**
+     * Attempt to claim controller leadership of the cluster in ZooKeeper. This involves overwriting the /controller
+     * and /controller_epoch ZNodes. The epoch given by {@code state} must be greater than the current epoch in ZooKeeper.
+     *
+     *
+     * @param state The current migration leadership state
+     * @return      An updated migration leadership state including the version of /controller_epoch ZNode, if the
+     *              leadership claim was successful. Otherwise, return the previous state unmodified.
+     */
+    ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state);
+
+    /**
+     * Release an existing claim on the cluster leadership in ZooKeeper. This involves deleting the /controller ZNode
+     * so that another controller can claim leadership.
+     *
+     * @param state The current migration leadership state.
+     * @return      An updated migration leadership state with controllerZkVersion = 1, or raise an exception if ZooKeeper
+     *
+     *
+     */
+    ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state);
+
+    ZkMigrationLeadershipState createTopic(
+        String topicName,
+        Uuid topicId,
+        Map<Integer, PartitionRegistration> topicPartitions,
+        ZkMigrationLeadershipState state
+    );
+
+    ZkMigrationLeadershipState updateTopicPartitions(
+        Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
+        ZkMigrationLeadershipState state
+    );
+
+    void readAllMetadata(Consumer<List<ApiMessageAndVersion>> batchConsumer, Consumer<Integer> brokerIdConsumer);
+
+    Set<Integer> readBrokerIds();
+
+    Set<Integer> readBrokerIdsFromTopicAssignments();
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
index c54cf7d0e4f..b6217ee80c2 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
@@ -53,16 +53,22 @@ public class ZkMigrationLeadershipState {
         this.controllerZkVersion = controllerZkVersion;
     }
 
-    public ZkMigrationLeadershipState withZkVersion(int zkVersion) {
+    public ZkMigrationLeadershipState withMigrationZkVersion(int zkVersion) {
         return new ZkMigrationLeadershipState(
-                this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
-                this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.controllerZkVersion);
+            this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
+            this.kraftMetadataEpoch, this.lastUpdatedTimeMs, zkVersion, this.controllerZkVersion);
+    }
+
+    public ZkMigrationLeadershipState withControllerZkVersion(int zkVersion) {
+        return new ZkMigrationLeadershipState(
+            this.kraftControllerId, this.kraftControllerEpoch, this.kraftMetadataOffset,
+            this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, zkVersion);
     }
 
     public ZkMigrationLeadershipState withNewKRaftController(int controllerId, int controllerEpoch) {
         return new ZkMigrationLeadershipState(
-                controllerId, controllerEpoch, this.kraftMetadataOffset,
-                this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.controllerZkVersion);
+            controllerId, controllerEpoch, this.kraftMetadataOffset,
+            this.kraftMetadataEpoch, this.lastUpdatedTimeMs, this.migrationZkVersion, this.controllerZkVersion);
     }
 
     public int kraftControllerId() {
@@ -100,14 +106,14 @@ public class ZkMigrationLeadershipState {
     @Override
     public String toString() {
         return "ZkMigrationLeadershipState{" +
-                "kraftControllerId=" + kraftControllerId +
-                ", kraftControllerEpoch=" + kraftControllerEpoch +
-                ", kraftMetadataOffset=" + kraftMetadataOffset +
-                ", kraftMetadataEpoch=" + kraftMetadataEpoch +
-                ", lastUpdatedTimeMs=" + lastUpdatedTimeMs +
-                ", migrationZkVersion=" + migrationZkVersion +
-                ", controllerZkVersion=" + controllerZkVersion +
-                '}';
+            "kraftControllerId=" + kraftControllerId +
+            ", kraftControllerEpoch=" + kraftControllerEpoch +
+            ", kraftMetadataOffset=" + kraftMetadataOffset +
+            ", kraftMetadataEpoch=" + kraftMetadataEpoch +
+            ", lastUpdatedTimeMs=" + lastUpdatedTimeMs +
+            ", migrationZkVersion=" + migrationZkVersion +
+            ", controllerZkVersion=" + controllerZkVersion +
+            '}';
     }
 
     @Override
@@ -115,11 +121,24 @@ public class ZkMigrationLeadershipState {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         ZkMigrationLeadershipState that = (ZkMigrationLeadershipState) o;
-        return kraftControllerId == that.kraftControllerId && kraftControllerEpoch == that.kraftControllerEpoch && kraftMetadataOffset == that.kraftMetadataOffset && kraftMetadataEpoch == that.kraftMetadataEpoch && lastUpdatedTimeMs == that.lastUpdatedTimeMs && migrationZkVersion == that.migrationZkVersion && controllerZkVersion == that.controllerZkVersion;
+        return kraftControllerId == that.kraftControllerId
+            && kraftControllerEpoch == that.kraftControllerEpoch
+            && kraftMetadataOffset == that.kraftMetadataOffset
+            && kraftMetadataEpoch == that.kraftMetadataEpoch
+            && lastUpdatedTimeMs == that.lastUpdatedTimeMs
+            && migrationZkVersion == that.migrationZkVersion
+            && controllerZkVersion == that.controllerZkVersion;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(kraftControllerId, kraftControllerEpoch, kraftMetadataOffset, kraftMetadataEpoch, lastUpdatedTimeMs, migrationZkVersion, controllerZkVersion);
+        return Objects.hash(
+            kraftControllerId,
+            kraftControllerEpoch,
+            kraftMetadataOffset,
+            kraftMetadataEpoch,
+            lastUpdatedTimeMs,
+            migrationZkVersion,
+            controllerZkVersion);
     }
 }