You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/02 20:40:28 UTC

[GitHub] [kafka] mumrah opened a new pull request, #12815: KIP-866 Part 1

mumrah opened a new pull request, #12815:
URL: https://github.com/apache/kafka/pull/12815

   This PR adds some of the implementation of KIP-866. Since this KIP is still under discussion, this PR should not be merged to `trunk`. 
   
   A few new things are defined here:
   * MigrationClient: an interface in the `metadata` package that abstracts some ZK things we want to do during the migration
   * ZkMigrationClient: a Scala implementation of MigrationClient that has dependencies on KafkaZkClient
   * ZkMetadataConsumer: a callback used by the QuorumController to receive the migrated ZK data as records
   * KRaftMetadataListener: a callback for handling leader changes and records (this is temporary/throw-away code)
   * KRaftMigrationDriver: the main class that manages the state and orchestrates the migration
   
   A ducktape test is also included that creates a topic in ZK mode, and verifies that it later exists in KRaft mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] akhileshchg commented on a diff in pull request #12815: KIP-866 Part 1

Posted by GitBox <gi...@apache.org>.
akhileshchg commented on code in PR #12815:
URL: https://github.com/apache/kafka/pull/12815#discussion_r1024674845


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1571,6 +1638,10 @@ private void resetToEmptyState() {
      */
     private final FeatureControlManager featureControl;
 
+    private final KRaftMetadataListener listener;
+
+    public final MigrationListener migrationListener = new MigrationListener(); // TODO clean this up

Review Comment:
   These two names get confusing quickly. In my understanding, `KRaftMetadataListener` is for replicating data consistently to Zookeeper from the KRaft metadata log and `MigrationListener` is for migrating Zookepeer data to KRaft Metadata log. 
   
   Can we rename `migrationListener` to `kraftToZkMigrationHandler` or something to that effect?
   and `listener` to zkToKRaftMigrationHandler`?



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -156,6 +157,76 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     tryCreateControllerZNodeAndIncrementEpoch()
   }
 
+  /**
+   * Registers a given KRaft controller in zookeeper as the active controller. Unlike the ZK equivalent of this method,
+   * this creates /controller as a persistent znode. This prevents ZK brokers from attempting to claim the controller
+   * leadership during a KRaft leadership failover.
+   *
+   * This method is called at the beginning of a KRaft migration and during subsequent KRaft leadership changes during
+   * the migration.
+   *
+   * To ensure that the KRaft controller epoch proceeds the current ZK controller epoch, this registration algorithm
+   * uses a conditional update on the /controller_epoch znode. If a new ZK controller is elected during this method,
+   * the multi request transaction will fail and this method will return None.
+   *
+   * @param kraftControllerId ID of the KRaft controller node
+   * @param kraftControllerEpoch Epoch of the KRaft controller node
+   * @return An optional of the new zkVersion of /controller_epoch. None if we could not register the KRaft controller.
+   */
+  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[Int] = {

Review Comment:
   Just a design consideration, do you think this must be part of the MigrationClient instead?



##########
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##########
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse}
+import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid}
+import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, VersionRange}
+import org.apache.kafka.migration._
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.zookeeper.CreateMode
+
+import java.util
+import java.util.function.Consumer
+import java.util.{Collections, Optional}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
+
+object ZkMigrationClient {
+  def brokerToBrokerRegistration(broker: Broker, epoch: Long): ZkBrokerRegistration = {
+      val registration = new BrokerRegistration(broker.id, epoch, Uuid.ZERO_UUID,

Review Comment:
   Just wondering, do we use the incarnation id for anything?



##########
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##########
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse}
+import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid}
+import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, VersionRange}
+import org.apache.kafka.migration._
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.zookeeper.CreateMode
+
+import java.util
+import java.util.function.Consumer
+import java.util.{Collections, Optional}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
+
+object ZkMigrationClient {
+  def brokerToBrokerRegistration(broker: Broker, epoch: Long): ZkBrokerRegistration = {
+      val registration = new BrokerRegistration(broker.id, epoch, Uuid.ZERO_UUID,
+        Collections.emptyList[Endpoint], Collections.emptyMap[String, VersionRange],
+        Optional.empty(), false, false)
+      new ZkBrokerRegistration(registration, null, null, false)
+  }
+}
+
+class ZkMigrationClient(zkClient: KafkaZkClient,
+                        controllerChannelManager: ControllerChannelManager) extends MigrationClient with Logging {
+
+  def claimControllerLeadership(kraftControllerId: Int, kraftControllerEpoch: Int): ZkControllerState = {
+    val epochZkVersionOpt = zkClient.tryRegisterKRaftControllerAsActiveController(kraftControllerId, kraftControllerEpoch)
+    if (epochZkVersionOpt.isDefined) {
+      new ZkControllerState(kraftControllerId, kraftControllerEpoch, epochZkVersionOpt.get)
+    } else {
+      throw new ControllerMovedException("Cannot claim controller leadership, the controller has moved.")
+    }
+  }
+
+  def migrateTopics(metadataVersion: MetadataVersion,
+                    recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+                    brokerIdConsumer: Consumer[Integer]): Unit = {
+    val topics = zkClient.getAllTopicsInCluster()
+    val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
+    val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
+      val partitions = assignments.keys.toSeq
+      val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
+      val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+      topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+        .setName(topic)
+        .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+
+      assignments.foreach { case (topicPartition, replicaAssignment) =>
+        replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
+        replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
+
+        val leaderIsrAndEpoch = leaderIsrAndControllerEpochs(topicPartition)
+        topicBatch.add(new ApiMessageAndVersion(new PartitionRecord()
+          .setTopicId(topicIdOpt.get)
+          .setPartitionId(topicPartition.partition)
+          .setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava)
+          .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
+          .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
+          .setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+          .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
+          .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
+          .setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value()), PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      val props = topicConfigs(topic)
+      props.forEach { case (key: Object, value: Object) =>
+        topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.TOPIC.id)
+          .setResourceName(topic)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      recordConsumer.accept(topicBatch)
+    }
+  }
+
+  def migrateBrokerConfigs(metadataVersion: MetadataVersion,
+                           recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+    val batch = new util.ArrayList[ApiMessageAndVersion]()
+    zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
+      val brokerResource = if (broker == ConfigEntityName.Default) {
+        ""
+      } else {
+        broker
+      }
+      props.forEach { case (key: Object, value: Object) =>
+        batch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.BROKER.id)
+          .setResourceName(brokerResource)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+    }
+    recordConsumer.accept(batch)
+  }
+
+  def migrateClientQuotas(metadataVersion: MetadataVersion,
+                          recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val adminZkClient = new AdminZkClient(zkClient)
+
+    def migrateEntityType(entityType: String): Unit = {
+      adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
+        val entity = new EntityData().setEntityType(entityType).setEntityName(name)
+        val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+          batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+            .setEntity(List(entity).asJava)
+            .setKey(key)
+            .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+        }
+        recordConsumer.accept(batch)
+      }
+    }
+
+    migrateEntityType(ConfigType.User)
+    migrateEntityType(ConfigType.Client)
+    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
+      // Lifted from ZkAdminManager
+      val components = name.split("/")
+      if (components.size != 3 || components(1) != "clients")
+        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+      val entity = List(
+        new EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+        new EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+      )
+
+      val batch = new util.ArrayList[ApiMessageAndVersion]()
+      ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+        batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+          .setEntity(entity.asJava)
+          .setKey(key)
+          .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+      recordConsumer.accept(batch)
+    }
+
+    migrateEntityType(ConfigType.Ip)
+  }
+
+  def migrateProducerId(metadataVersion: MetadataVersion,
+                        recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+    dataOpt match {
+      case Some(data) =>
+        val producerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+        recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord()
+          .setBrokerEpoch(-1)
+          .setBrokerId(producerIdBlock.assignedBrokerId)
+          .setNextProducerId(producerIdBlock.firstProducerId), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+      case None => // Nothing to migrate
+    }
+  }
+
+  override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]], brokerIdConsumer: Consumer[Integer]): Unit = {
+    migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
+    migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
+    migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
+    migrateProducerId(MetadataVersion.latest(), batchConsumer)
+  }
+
+  override def watchZkBrokerRegistrations(listener: MigrationClient.BrokerRegistrationListener): Unit = {
+    val brokersHandler = new ZNodeChildChangeHandler() {
+      override val path: String = BrokerIdsZNode.path
+
+      override def handleChildChange(): Unit = listener.onBrokersChange()
+    }
+    System.err.println("Adding /brokers watch")
+    zkClient.registerZNodeChildChangeHandler(brokersHandler)
+
+    def brokerHandler(brokerId: Int): ZNodeChangeHandler = {
+      new ZNodeChangeHandler() {
+        override val path: String = BrokerIdZNode.path(brokerId)
+
+        override def handleDataChange(): Unit = listener.onBrokerChange(brokerId)
+      }
+    }
+
+    val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster()
+    curBrokerAndEpochs.foreach { case (broker, _) =>
+      System.err.println(s"Adding /brokers/${broker.id} watch")
+      zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerHandler(broker.id))
+    }
+
+    listener.onBrokersChange()
+  }
+
+  override def readBrokerRegistration(brokerId: Int): Optional[ZkBrokerRegistration] = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    if (brokerAndEpoch.isEmpty) {
+      Optional.empty()
+    } else {
+      Optional.of(brokerToBrokerRegistration(brokerAndEpoch.head._1, brokerAndEpoch.head._2))
+    }
+  }
+
+  override def readBrokerIds(): util.Set[Integer] = {
+    zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
+  }
+
+  override def addZkBroker(brokerId: Int): Unit = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    controllerChannelManager.addBroker(brokerAndEpoch.head._1)
+  }
+
+  override def removeZkBroker(brokerId: Int): Unit = {
+    controllerChannelManager.removeBroker(brokerId)
+  }
+
+  override def getOrCreateMigrationRecoveryState(initialState: MigrationRecoveryState): MigrationRecoveryState = {
+    zkClient.getOrCreateMigrationState(initialState)
+  }
+
+  override def setMigrationRecoveryState(state: MigrationRecoveryState): MigrationRecoveryState = {
+    zkClient.updateMigrationState(state)
+  }
+
+  override def sendRequestToBroker(brokerId: Int,
+                                   request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
+                                   callback: Consumer[AbstractResponse]): Unit = {
+    controllerChannelManager.sendRequest(brokerId, request, callback.accept)
+  }
+
+  override def createTopic(topicName: String, topicId: Uuid, partitions: util.Map[Integer, PartitionRegistration], state: MigrationRecoveryState): MigrationRecoveryState = {
+    val assignments = partitions.asScala.map { case (partitionId, partition) =>
+      new TopicPartition(topicName, partitionId) -> ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
+    }
+
+    val createTopicZNode = {
+      val path = TopicZNode.path(topicName)
+      CreateRequest(
+        path,
+        TopicZNode.encode(Some(topicId), assignments),
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+    val createPartitionsZNode = {
+      val path = TopicPartitionsZNode.path(topicName)
+      CreateRequest(
+        path,
+        null,
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+
+    val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) =>
+      val topicPartition = new TopicPartition(topicName, partitionId)
+      Seq(
+        createTopicPartition(topicPartition),
+        createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
+      )
+    }
+
+    val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+
+  private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = {
+    val path = TopicPartitionZNode.path(topicPartition)
+    CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def createTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): CreateRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def updateTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): SetDataRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
+  }
+
+  override def updateTopicPartitions(topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
+                                     state: MigrationRecoveryState): MigrationRecoveryState = {
+    val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) =>
+      partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) =>
+        val topicPartition = new TopicPartition(topicName, partitionId)
+        Seq(updateTopicPartitionState(topicPartition, partitionRegistration, state.kraftControllerEpoch()))
+      }
+    }
+    if (requests.isEmpty) {
+      state
+    } else {
+      val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state.controllerZkVersion(), state)
+      responses.foreach(System.err.println)
+      state.withZkVersion(migrationZkVersion)
+    }
+  }
+
+  override def createKRaftBroker(brokerId: Int, brokerRegistration: BrokerRegistration, state: MigrationRecoveryState): MigrationRecoveryState = {
+    val brokerInfo = BrokerInfo(
+      Broker(
+        id = brokerId,
+        endPoints = brokerRegistration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
+        rack = brokerRegistration.rack().toScala),
+      MetadataVersion.latest(), // TODO ???
+      -1
+    )
+    val req = CreateRequest(brokerInfo.path, brokerInfo.toJsonBytes, zkClient.defaultAcls(brokerInfo.path), CreateMode.PERSISTENT)
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(req), state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+
+  override def updateKRaftBroker(brokerId: Int, brokerRegistration: BrokerRegistration, state: MigrationRecoveryState): MigrationRecoveryState = {
+    val brokerInfo = BrokerInfo(
+      Broker(
+        id = brokerId,
+        endPoints = brokerRegistration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
+        rack = brokerRegistration.rack().toScala),
+      MetadataVersion.latest(), // TODO ???
+      -1
+    )
+    val req = SetDataRequest(BrokerIdZNode.path(brokerId), brokerInfo.toJsonBytes, ZkVersion.MatchAnyVersion)
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(req), state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+
+  override def removeKRaftBroker(brokerId: Int, state: MigrationRecoveryState): MigrationRecoveryState = {
+    val req = DeleteRequest(BrokerIdZNode.path(brokerId), ZkVersion.MatchAnyVersion)
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(req), state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+}

Review Comment:
   Ditto. Why do we need these operations for KRaft brokers?



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -531,8 +537,15 @@ class KafkaServer(
     )
 
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+
+    val migrationInfo = if (config.interBrokerProtocolVersion.isAtLeast(IBP_3_4_IV0)) {
+      Some(BrokerMigration(clusterId, config.interBrokerProtocolVersion, enabled = true))
+    } else {
+      None
+    }

Review Comment:
   Don't we need to check for migration flag here?



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -1772,6 +1895,106 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
   }
 
+  // Perform a sequence of updates to ZooKeeper as part of a KRaft dual write. In addition to adding a CheckOp on the
+  // controller epoch ZNode, we also include CheckOp/SetDataOp on the migration ZNode. This ensure proper fencing
+  // from errant ZK controllers as well as fencing from new KRaft controllers.
+  def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req],
+                                                                expectedControllerZkVersion: Int,
+                                                                migrationState: MigrationRecoveryState): (Int, Seq[Req#Response]) = {
+
+    if (requests.isEmpty) {
+      throw new IllegalArgumentException("Must specify at least one ZK request for a migration operation.")
+    }
+    def wrapMigrationRequest(request: Req, updateMigrationNode: Boolean): MultiRequest = {
+      val checkOp = CheckOp(ControllerEpochZNode.path, expectedControllerZkVersion)
+      val migrationOp = if (updateMigrationNode) {
+        SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
+      } else {
+        CheckOp(MigrationZNode.path, migrationState.migrationZkVersion())
+      }

Review Comment:
   So, we generate Zk writes for a given offset and update the offset at the end of all the successful writes. But I think if there's a failure in between the Zk writes. We have to implement some idempotency for those Zk writes just in case we retry.



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), migrationState -> client.createTopic(topicDelta.name(), topicId, topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), migrationState -> client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) -> {
+                            if (brokerRegistrationOpt.isPresent() && image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, migrationState -> client.createKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, migrationState -> client.updateKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? "done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is {}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here

Review Comment:
   What is the timeline for this and who's the owner?



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), migrationState -> client.createTopic(topicDelta.name(), topicId, topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), migrationState -> client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) -> {
+                            if (brokerRegistrationOpt.isPresent() && image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, migrationState -> client.createKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, migrationState -> client.updateKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? "done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is {}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokersChangeEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            Set<Integer> updatedBrokerIds = client.readBrokerIds();
+            Set<Integer> added = new HashSet<>(updatedBrokerIds);
+            added.removeAll(zkBrokerRegistrations.keySet());
+
+            Set<Integer> removed = new HashSet<>(zkBrokerRegistrations.keySet());
+            removed.removeAll(updatedBrokerIds);
+
+            log.debug("ZK Brokers added: " + added + ", removed: " + removed);
+            added.forEach(brokerId -> {
+                Optional<ZkBrokerRegistration> broker = client.readBrokerRegistration(brokerId);
+                if (broker.isPresent()) {
+                    client.addZkBroker(brokerId);

Review Comment:
   I think the client addition of the broker should be done as the final step of ZkBrokerRegistration reconciliation in KRaft controller. Once all the validations are done, I think we should first add BrokerRegistrationChangeRecord for the addition or removal of Zk broker. Also, validation itself is probably an async step since we decided to use hearbeats to see if brokers are ready.



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), migrationState -> client.createTopic(topicDelta.name(), topicId, topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), migrationState -> client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) -> {
+                            if (brokerRegistrationOpt.isPresent() && image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, migrationState -> client.createKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, migrationState -> client.updateKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? "done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is {}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }

Review Comment:
   What would be idle here? KILL and retry? or simply add MigrateMetadataEvent again technically making async infinite loop in worst case.



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), migrationState -> client.createTopic(topicDelta.name(), topicId, topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), migrationState -> client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) -> {
+                            if (brokerRegistrationOpt.isPresent() && image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, migrationState -> client.createKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, migrationState -> client.updateKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? "done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is {}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }

Review Comment:
   Shouldn't this finally schedule deferred event?



##########
metadata/src/main/java/org/apache/kafka/migration/MigrationState.java:
##########
@@ -0,0 +1,20 @@
+package org.apache.kafka.migration;
+
+public enum MigrationState {

Review Comment:
   I think it is better to illustrate possible state transitions here. 



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {

Review Comment:
   We should also have a ProcessHeartBeatEvent



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), migrationState -> client.createTopic(topicDelta.name(), topicId, topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), migrationState -> client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) -> {
+                            if (brokerRegistrationOpt.isPresent() && image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, migrationState -> client.createKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, migrationState -> client.updateKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? "done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is {}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokersChangeEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            Set<Integer> updatedBrokerIds = client.readBrokerIds();
+            Set<Integer> added = new HashSet<>(updatedBrokerIds);
+            added.removeAll(zkBrokerRegistrations.keySet());
+
+            Set<Integer> removed = new HashSet<>(zkBrokerRegistrations.keySet());
+            removed.removeAll(updatedBrokerIds);
+
+            log.debug("ZK Brokers added: " + added + ", removed: " + removed);
+            added.forEach(brokerId -> {
+                Optional<ZkBrokerRegistration> broker = client.readBrokerRegistration(brokerId);
+                if (broker.isPresent()) {
+                    client.addZkBroker(brokerId);
+                    zkBrokerRegistrations.put(brokerId, broker.get());
+                } else {
+                    throw new IllegalStateException("Saw broker " + brokerId + " added, but registration data is missing");
+                }
+            });
+            removed.forEach(brokerId -> {
+                client.removeZkBroker(brokerId);
+                zkBrokerRegistrations.remove(brokerId);
+            });
+
+            // TODO actually verify the IBP and clusterID
+            boolean brokersReady = zkBrokerRegistrations.values().stream().allMatch(broker ->
+                broker.isMigrationReady() && broker.ibp().isPresent() && broker.clusterId().isPresent());
+            // TODO add some state to track if brokers are ready
+            if (brokersReady) {
+                log.debug("All ZK Brokers are ready for migration.");
+                //transitionTo(MigrationState.READY);
+            } else {
+                log.debug("Some ZK Brokers still not ready for migration.");
+                //transitionTo(MigrationState.INELIGIBLE);
+            }
+            // TODO integrate with ClusterControlManager
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokerIdChangeEvent implements EventQueue.Event {
+        private final int brokerId;
+
+        BrokerIdChangeEvent(int brokerId) {
+            this.brokerId = brokerId;
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO not sure this is expected. Can registration data change at runtime?
+            log.debug("Broker {} changed. New registration: {}", brokerId, client.readBrokerRegistration(brokerId));
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class KRaftLeaderEvent implements EventQueue.Event {
+        private final boolean isActive;
+        private final int kraftControllerId;
+        private final int kraftControllerEpoch;
+
+        KRaftLeaderEvent(boolean isActive, int kraftControllerId, int kraftControllerEpoch) {
+            this.isActive = isActive;
+            this.kraftControllerId = kraftControllerId;
+            this.kraftControllerEpoch = kraftControllerEpoch;
+        }
+
+        @Override
+        public void run() throws Exception {
+            if (migrationState == MigrationState.UNINITIALIZED) {
+                // If we get notified about being the active controller before we have initialized, we need
+                // to reschedule this event.
+                eventQueue.append(new PollEvent());
+                eventQueue.append(this);
+                return;
+            }
+
+            if (!isActive) {
+                apply("KRaftLeaderEvent is active", state -> state.mergeWithControllerState(ZkControllerState.EMPTY));
+                transitionTo(MigrationState.INACTIVE);
+            } else {
+                // Apply the new KRaft state
+                apply("KRaftLeaderEvent not active", state -> state.withNewKRaftController(kraftControllerId, kraftControllerEpoch));
+                // Instead of doing the ZK write directly, schedule as an event so that we can easily retry ZK failures
+                transitionTo(MigrationState.NEW_LEADER);
+                eventQueue.append(new BecomeZkLeaderEvent());
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BecomeZkLeaderEvent extends ZkWriteEvent {
+        @Override
+        public void run() throws Exception {
+            ZkControllerState zkControllerState = client.claimControllerLeadership(
+                recoveryState.kraftControllerId(), recoveryState.kraftControllerEpoch());
+            apply("BecomeZkLeaderEvent", state -> state.mergeWithControllerState(zkControllerState));
+
+            if (!recoveryState.zkMigrationComplete()) {
+                transitionTo(MigrationState.ZK_MIGRATION);

Review Comment:
   I think BECOME_LEADER should move to NOT_READY. Based on the heartbeats, the Hearbeats event should transition the state to either NOT_READY or ZK_MIGRATION. 



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), migrationState -> client.createTopic(topicDelta.name(), topicId, topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), migrationState -> client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) -> {
+                            if (brokerRegistrationOpt.isPresent() && image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, migrationState -> client.createKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, migrationState -> client.updateKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? "done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is {}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokersChangeEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            Set<Integer> updatedBrokerIds = client.readBrokerIds();
+            Set<Integer> added = new HashSet<>(updatedBrokerIds);
+            added.removeAll(zkBrokerRegistrations.keySet());
+
+            Set<Integer> removed = new HashSet<>(zkBrokerRegistrations.keySet());
+            removed.removeAll(updatedBrokerIds);
+
+            log.debug("ZK Brokers added: " + added + ", removed: " + removed);
+            added.forEach(brokerId -> {
+                Optional<ZkBrokerRegistration> broker = client.readBrokerRegistration(brokerId);
+                if (broker.isPresent()) {
+                    client.addZkBroker(brokerId);

Review Comment:
   Technically I think this method doesn't need to directly talk to ClusterLinkControlManager if we do the above IMO.



##########
metadata/src/main/java/org/apache/kafka/migration/KRaftMigrationDriver.java:
##########
@@ -0,0 +1,414 @@
+package org.apache.kafka.migration;
+
+import org.apache.kafka.common.message.UpdateMetadataResponseData;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * serialize events coming from various threads and listeners.
+ */
+public class KRaftMigrationDriver {
+
+    class MetadataLogListener implements KRaftMetadataListener {
+        MetadataImage image = MetadataImage.EMPTY;
+        MetadataDelta delta = new MetadataDelta(image);
+
+        @Override
+        public void handleLeaderChange(boolean isActive, int epoch) {
+            eventQueue.append(new KRaftLeaderEvent(isActive, nodeId, epoch));
+        }
+
+        @Override
+        public void handleRecord(long offset, int epoch, ApiMessage record) {
+            if (record.apiKey() == MetadataRecordType.NO_OP_RECORD.id()) {
+                return;
+            }
+
+            eventQueue.append(new EventQueue.Event() {
+                @Override
+                public void run() throws Exception {
+                    if (delta == null) {
+                        delta = new MetadataDelta(image);
+                    }
+                    delta.replay(offset, epoch, record);
+                }
+
+                @Override
+                public void handleException(Throwable e) {
+                    log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+                }
+            });
+        }
+
+        ZkWriteEvent syncMetadataToZkEvent() {
+            return new ZkWriteEvent(){
+                @Override
+                public void run() throws Exception {
+                if (delta == null) {
+                    return;
+                }
+
+                log.info("Writing metadata changes to ZK");
+                try {
+                    apply("Sync to ZK", __ -> migrationState(delta.highestOffset(), delta.highestEpoch()));
+                    if (delta.topicsDelta() != null) {
+                        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+                            // Ensure the topic exists
+                            if (image.topics().getTopic(topicId) == null) {
+                                apply("Create topic " + topicDelta.name(), migrationState -> client.createTopic(topicDelta.name(), topicId, topicDelta.partitionChanges(), migrationState));
+                            } else {
+                                apply("Updating topic " + topicDelta.name(), migrationState -> client.updateTopicPartitions(Collections.singletonMap(topicDelta.name(), topicDelta.partitionChanges()), migrationState));
+                            }
+                        });
+                    }
+
+                    if (delta.clusterDelta() != null) {
+                        delta.clusterDelta().changedBrokers().forEach((brokerId, brokerRegistrationOpt) -> {
+                            if (brokerRegistrationOpt.isPresent() && image.cluster().broker(brokerId) == null) {
+                                apply("Create Broker " + brokerId, migrationState -> client.createKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else if (brokerRegistrationOpt.isPresent()) {
+                                apply("Update Broker " + brokerId, migrationState -> client.updateKRaftBroker(brokerId, brokerRegistrationOpt.get(), migrationState));
+                            } else {
+                                apply("Remove Broker " + brokerId, migrationState -> client.removeKRaftBroker(brokerId, migrationState));
+                            }
+                        });
+                    }
+                } finally {
+                    image = delta.apply();
+                    delta = null;
+                }
+                }
+            };
+        }
+    }
+
+    class ZkBrokerListener implements MigrationClient.BrokerRegistrationListener {
+        @Override
+        public void onBrokerChange(Integer brokerId) {
+            eventQueue.append(new BrokerIdChangeEvent(brokerId));
+        }
+
+        @Override
+        public void onBrokersChange() {
+            eventQueue.append(new BrokersChangeEvent());
+        }
+    }
+
+    abstract class RPCResponseEvent<T extends ApiMessage> implements EventQueue.Event {
+        private final int brokerId;
+        private final T data;
+
+        RPCResponseEvent(int brokerId, T data) {
+            this.brokerId = brokerId;
+            this.data = data;
+        }
+
+        int brokerId() {
+            return brokerId;
+        }
+        T data() {
+            return data;
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    abstract class ZkWriteEvent implements EventQueue.Event {
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class UpdateMetadataResponseEvent extends RPCResponseEvent<UpdateMetadataResponseData> {
+        UpdateMetadataResponseEvent(int brokerId, UpdateMetadataResponseData data) {
+            super(brokerId, data);
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO handle UMR response
+        }
+    }
+
+    class PollEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            switch (migrationState) {
+                case UNINITIALIZED:
+                    log.info("Recovering migration state");
+                    apply("Recovery", client::getOrCreateMigrationRecoveryState);
+                    client.watchZkBrokerRegistrations(new ZkBrokerListener());
+                    String maybeDone = recoveryState.zkMigrationComplete() ? "done" : "not done";
+                    log.info("Recovered migration state {}. ZK migration is {}.", recoveryState, maybeDone);
+                    transitionTo(MigrationState.INACTIVE);
+                    break;
+                case INACTIVE:
+                    break;
+                case NEW_LEADER:
+                    // This probably means we are retrying
+                    eventQueue.append(new BecomeZkLeaderEvent());
+                    break;
+                case NOT_READY:
+                    break;
+                case ZK_MIGRATION:
+                    eventQueue.append(new MigrateMetadataEvent());
+                    break;
+                case DUAL_WRITE:
+                    eventQueue.append(listener.syncMetadataToZkEvent());
+                    break;
+            }
+
+            // Poll again after some time
+            long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
+            eventQueue.scheduleDeferred(
+                "poll",
+                new EventQueue.DeadlineFunction(deadline),
+                new PollEvent());
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class MigrateMetadataEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            if (migrationState != MigrationState.ZK_MIGRATION) {
+                log.warn("Skipping ZK migration, already done");
+                return;
+            }
+
+            Set<Integer> brokersWithAssignments = new HashSet<>();
+            log.info("Begin migration from ZK");
+            consumer.beginMigration();
+            try {
+                // TODO use a KIP-868 metadata transaction here
+                List<CompletableFuture<?>> futures = new ArrayList<>();
+                client.readAllMetadata(batch -> futures.add(consumer.acceptBatch(batch)), brokersWithAssignments::add);
+                CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).get();
+
+                Set<Integer> brokersWithRegistrations = new HashSet<>(zkBrokerRegistrations.keySet());
+                brokersWithAssignments.removeAll(brokersWithRegistrations);
+                if (!brokersWithAssignments.isEmpty()) {
+                    //throw new IllegalStateException("Cannot migrate data with offline brokers: " + brokersWithAssignments);
+                    log.error("Offline ZK brokers detected: {}", brokersWithAssignments);
+                }
+
+                // Update the migration state
+                OffsetAndEpoch offsetAndEpoch = consumer.completeMigration();
+                apply("Migrating ZK to KRaft", __ -> migrationState(offsetAndEpoch.offset, offsetAndEpoch.epoch));
+            } catch (Throwable t) {
+                log.error("Migration failed", t);
+                consumer.abortMigration();
+            } finally {
+                // TODO Just skip to dual write for now
+                apply("Persist recovery state", client::setMigrationRecoveryState);
+                transitionTo(MigrationState.DUAL_WRITE);
+            }
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokersChangeEvent implements EventQueue.Event {
+        @Override
+        public void run() throws Exception {
+            Set<Integer> updatedBrokerIds = client.readBrokerIds();
+            Set<Integer> added = new HashSet<>(updatedBrokerIds);
+            added.removeAll(zkBrokerRegistrations.keySet());
+
+            Set<Integer> removed = new HashSet<>(zkBrokerRegistrations.keySet());
+            removed.removeAll(updatedBrokerIds);
+
+            log.debug("ZK Brokers added: " + added + ", removed: " + removed);
+            added.forEach(brokerId -> {
+                Optional<ZkBrokerRegistration> broker = client.readBrokerRegistration(brokerId);
+                if (broker.isPresent()) {
+                    client.addZkBroker(brokerId);
+                    zkBrokerRegistrations.put(brokerId, broker.get());
+                } else {
+                    throw new IllegalStateException("Saw broker " + brokerId + " added, but registration data is missing");
+                }
+            });
+            removed.forEach(brokerId -> {
+                client.removeZkBroker(brokerId);
+                zkBrokerRegistrations.remove(brokerId);
+            });
+
+            // TODO actually verify the IBP and clusterID
+            boolean brokersReady = zkBrokerRegistrations.values().stream().allMatch(broker ->
+                broker.isMigrationReady() && broker.ibp().isPresent() && broker.clusterId().isPresent());
+            // TODO add some state to track if brokers are ready
+            if (brokersReady) {
+                log.debug("All ZK Brokers are ready for migration.");
+                //transitionTo(MigrationState.READY);
+            } else {
+                log.debug("Some ZK Brokers still not ready for migration.");
+                //transitionTo(MigrationState.INELIGIBLE);
+            }
+            // TODO integrate with ClusterControlManager
+        }
+
+        @Override
+        public void handleException(Throwable e) {
+            log.error("Had an exception in " + this.getClass().getSimpleName(), e);
+        }
+    }
+
+    class BrokerIdChangeEvent implements EventQueue.Event {
+        private final int brokerId;
+
+        BrokerIdChangeEvent(int brokerId) {
+            this.brokerId = brokerId;
+        }
+
+        @Override
+        public void run() throws Exception {
+            // TODO not sure this is expected. Can registration data change at runtime?
+            log.debug("Broker {} changed. New registration: {}", brokerId, client.readBrokerRegistration(brokerId));
+        }

Review Comment:
   How are deregistration of brokers handled in Zk then?



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -1772,6 +1895,106 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
   }
 
+  // Perform a sequence of updates to ZooKeeper as part of a KRaft dual write. In addition to adding a CheckOp on the
+  // controller epoch ZNode, we also include CheckOp/SetDataOp on the migration ZNode. This ensure proper fencing
+  // from errant ZK controllers as well as fencing from new KRaft controllers.
+  def retryMigrationRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req],
+                                                                expectedControllerZkVersion: Int,
+                                                                migrationState: MigrationRecoveryState): (Int, Seq[Req#Response]) = {
+
+    if (requests.isEmpty) {
+      throw new IllegalArgumentException("Must specify at least one ZK request for a migration operation.")
+    }
+    def wrapMigrationRequest(request: Req, updateMigrationNode: Boolean): MultiRequest = {
+      val checkOp = CheckOp(ControllerEpochZNode.path, expectedControllerZkVersion)
+      val migrationOp = if (updateMigrationNode) {
+        SetDataOp(MigrationZNode.path, MigrationZNode.encode(migrationState), migrationState.migrationZkVersion())
+      } else {
+        CheckOp(MigrationZNode.path, migrationState.migrationZkVersion())
+      }
+
+      request match {
+        case CreateRequest(path, data, acl, createMode, ctx) =>
+          MultiRequest(Seq(checkOp, migrationOp, CreateOp(path, data, acl, createMode)), ctx)
+        case DeleteRequest(path, version, ctx) =>
+          MultiRequest(Seq(checkOp, migrationOp, DeleteOp(path, version)), ctx)
+        case SetDataRequest(path, data, version, ctx) =>
+          MultiRequest(Seq(checkOp, migrationOp, SetDataOp(path, data, version)), ctx)
+        case _ => throw new IllegalStateException(s"$request does not need controller epoch check")
+      }
+    }
+
+    def handleUnwrappedMigrationResult(migrationOp: ZkOp, migrationResult: OpResult): Int = {
+      val (path: String, data: Option[Array[Byte]], version: Int) = migrationOp match {
+        case CheckOp(path, version) => (path, None, version)
+        case SetDataOp(path, data, version) => (path, Some(data), version)
+        case _ => throw new IllegalStateException("Unexpected result on /migration znode")
+      }
+
+      migrationResult match {
+        case _: CheckResult => version
+        case setDataResult: SetDataResult => setDataResult.getStat.getVersion
+        case errorResult: ErrorResult =>
+          if (path.equals(MigrationZNode.path)) {
+            val errorCode = Code.get(errorResult.getErr)
+            if (errorCode == Code.BADVERSION) {
+              data match {
+                case Some(value) =>
+                  val failedPayload = MigrationZNode.decode(value, version)
+                  throw new RuntimeException(s"Conditional update on KRaft Migration znode failed. Expected zkVersion = ${version}. " +
+                    s"The failed write was: ${failedPayload}. This indicates that another KRaft controller is making writes to ZooKeeper.")
+                case None =>
+                  throw new RuntimeException(s"Check op on KRaft Migration znode failed. Expected zkVersion = ${version}. " +
+                    s"This indicates that another KRaft controller is making writes to ZooKeeper.")
+              }
+            } else if (errorCode == Code.OK) {
+              // what?
+              version
+            } else {
+              throw KeeperException.create(errorCode, path)
+            }
+          } else {
+            throw new RuntimeException(s"Got migration result for incorrect path $path")
+          }
+        case _ => throw new RuntimeException(s"Expected either CheckResult, SetDataResult, or ErrorResult for migration op, but saw ${migrationResult}")
+      }
+    }
+
+    def unwrapMigrationRequest(response: AsyncResponse): (AsyncResponse, Int) = {

Review Comment:
   unwrapMigrationResponse?



##########
core/src/main/scala/kafka/server/KafkaRaftServer.scala:
##########
@@ -109,6 +113,13 @@ class KafkaRaftServer(
   }
 
   private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) {
+    // TODO clean these up
+    val zkClient = KafkaServer.zkClient("KRaft migration", time, config, KafkaServer.zkClientConfigFromKafkaConfig(config))
+    val stateChangeLogger = new StateChangeLogger(-1, inControllerContext = false, None)
+    val channelManager = new ControllerChannelManager(() => -1, config, Time.SYSTEM, new Metrics(), stateChangeLogger)
+    val migrationClient = new ZkMigrationClient(zkClient, channelManager)

Review Comment:
   Just to note, even though we named the class `ZkMigrationClient,` we seem to be using it not only to communicate with Zookeeper but also with Zookeeper based brokers. 



##########
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##########
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse}
+import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid}
+import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, VersionRange}
+import org.apache.kafka.migration._
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.zookeeper.CreateMode
+
+import java.util
+import java.util.function.Consumer
+import java.util.{Collections, Optional}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
+
+object ZkMigrationClient {
+  def brokerToBrokerRegistration(broker: Broker, epoch: Long): ZkBrokerRegistration = {
+      val registration = new BrokerRegistration(broker.id, epoch, Uuid.ZERO_UUID,
+        Collections.emptyList[Endpoint], Collections.emptyMap[String, VersionRange],
+        Optional.empty(), false, false)
+      new ZkBrokerRegistration(registration, null, null, false)
+  }
+}
+
+class ZkMigrationClient(zkClient: KafkaZkClient,
+                        controllerChannelManager: ControllerChannelManager) extends MigrationClient with Logging {
+
+  def claimControllerLeadership(kraftControllerId: Int, kraftControllerEpoch: Int): ZkControllerState = {
+    val epochZkVersionOpt = zkClient.tryRegisterKRaftControllerAsActiveController(kraftControllerId, kraftControllerEpoch)
+    if (epochZkVersionOpt.isDefined) {
+      new ZkControllerState(kraftControllerId, kraftControllerEpoch, epochZkVersionOpt.get)
+    } else {
+      throw new ControllerMovedException("Cannot claim controller leadership, the controller has moved.")
+    }
+  }
+
+  def migrateTopics(metadataVersion: MetadataVersion,
+                    recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+                    brokerIdConsumer: Consumer[Integer]): Unit = {
+    val topics = zkClient.getAllTopicsInCluster()
+    val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
+    val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
+      val partitions = assignments.keys.toSeq
+      val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
+      val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+      topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+        .setName(topic)
+        .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+
+      assignments.foreach { case (topicPartition, replicaAssignment) =>
+        replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
+        replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
+
+        val leaderIsrAndEpoch = leaderIsrAndControllerEpochs(topicPartition)
+        topicBatch.add(new ApiMessageAndVersion(new PartitionRecord()
+          .setTopicId(topicIdOpt.get)
+          .setPartitionId(topicPartition.partition)
+          .setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava)
+          .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
+          .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
+          .setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+          .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
+          .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
+          .setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value()), PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      val props = topicConfigs(topic)
+      props.forEach { case (key: Object, value: Object) =>
+        topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.TOPIC.id)
+          .setResourceName(topic)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      recordConsumer.accept(topicBatch)
+    }
+  }
+
+  def migrateBrokerConfigs(metadataVersion: MetadataVersion,
+                           recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+    val batch = new util.ArrayList[ApiMessageAndVersion]()
+    zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
+      val brokerResource = if (broker == ConfigEntityName.Default) {
+        ""
+      } else {
+        broker
+      }
+      props.forEach { case (key: Object, value: Object) =>
+        batch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.BROKER.id)
+          .setResourceName(brokerResource)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+    }
+    recordConsumer.accept(batch)
+  }
+
+  def migrateClientQuotas(metadataVersion: MetadataVersion,
+                          recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val adminZkClient = new AdminZkClient(zkClient)
+
+    def migrateEntityType(entityType: String): Unit = {
+      adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
+        val entity = new EntityData().setEntityType(entityType).setEntityName(name)
+        val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+          batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+            .setEntity(List(entity).asJava)
+            .setKey(key)
+            .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+        }
+        recordConsumer.accept(batch)
+      }
+    }
+
+    migrateEntityType(ConfigType.User)
+    migrateEntityType(ConfigType.Client)
+    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
+      // Lifted from ZkAdminManager
+      val components = name.split("/")
+      if (components.size != 3 || components(1) != "clients")
+        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+      val entity = List(
+        new EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+        new EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+      )
+
+      val batch = new util.ArrayList[ApiMessageAndVersion]()
+      ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+        batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+          .setEntity(entity.asJava)
+          .setKey(key)
+          .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+      recordConsumer.accept(batch)
+    }
+
+    migrateEntityType(ConfigType.Ip)
+  }
+
+  def migrateProducerId(metadataVersion: MetadataVersion,
+                        recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+    dataOpt match {
+      case Some(data) =>
+        val producerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+        recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord()
+          .setBrokerEpoch(-1)
+          .setBrokerId(producerIdBlock.assignedBrokerId)
+          .setNextProducerId(producerIdBlock.firstProducerId), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+      case None => // Nothing to migrate
+    }
+  }
+
+  override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]], brokerIdConsumer: Consumer[Integer]): Unit = {
+    migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
+    migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
+    migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
+    migrateProducerId(MetadataVersion.latest(), batchConsumer)
+  }
+
+  override def watchZkBrokerRegistrations(listener: MigrationClient.BrokerRegistrationListener): Unit = {
+    val brokersHandler = new ZNodeChildChangeHandler() {
+      override val path: String = BrokerIdsZNode.path
+
+      override def handleChildChange(): Unit = listener.onBrokersChange()
+    }
+    System.err.println("Adding /brokers watch")
+    zkClient.registerZNodeChildChangeHandler(brokersHandler)
+
+    def brokerHandler(brokerId: Int): ZNodeChangeHandler = {
+      new ZNodeChangeHandler() {
+        override val path: String = BrokerIdZNode.path(brokerId)
+
+        override def handleDataChange(): Unit = listener.onBrokerChange(brokerId)
+      }
+    }
+
+    val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster()
+    curBrokerAndEpochs.foreach { case (broker, _) =>
+      System.err.println(s"Adding /brokers/${broker.id} watch")
+      zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerHandler(broker.id))
+    }
+
+    listener.onBrokersChange()
+  }
+
+  override def readBrokerRegistration(brokerId: Int): Optional[ZkBrokerRegistration] = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    if (brokerAndEpoch.isEmpty) {
+      Optional.empty()
+    } else {
+      Optional.of(brokerToBrokerRegistration(brokerAndEpoch.head._1, brokerAndEpoch.head._2))
+    }
+  }
+
+  override def readBrokerIds(): util.Set[Integer] = {
+    zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
+  }
+
+  override def addZkBroker(brokerId: Int): Unit = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    controllerChannelManager.addBroker(brokerAndEpoch.head._1)
+  }
+
+  override def removeZkBroker(brokerId: Int): Unit = {
+    controllerChannelManager.removeBroker(brokerId)
+  }
+
+  override def getOrCreateMigrationRecoveryState(initialState: MigrationRecoveryState): MigrationRecoveryState = {
+    zkClient.getOrCreateMigrationState(initialState)
+  }
+
+  override def setMigrationRecoveryState(state: MigrationRecoveryState): MigrationRecoveryState = {
+    zkClient.updateMigrationState(state)
+  }
+
+  override def sendRequestToBroker(brokerId: Int,
+                                   request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
+                                   callback: Consumer[AbstractResponse]): Unit = {
+    controllerChannelManager.sendRequest(brokerId, request, callback.accept)
+  }
+
+  override def createTopic(topicName: String, topicId: Uuid, partitions: util.Map[Integer, PartitionRegistration], state: MigrationRecoveryState): MigrationRecoveryState = {
+    val assignments = partitions.asScala.map { case (partitionId, partition) =>
+      new TopicPartition(topicName, partitionId) -> ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
+    }
+
+    val createTopicZNode = {
+      val path = TopicZNode.path(topicName)
+      CreateRequest(
+        path,
+        TopicZNode.encode(Some(topicId), assignments),
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+    val createPartitionsZNode = {
+      val path = TopicPartitionsZNode.path(topicName)
+      CreateRequest(
+        path,
+        null,
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+
+    val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) =>
+      val topicPartition = new TopicPartition(topicName, partitionId)
+      Seq(
+        createTopicPartition(topicPartition),
+        createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
+      )
+    }
+
+    val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+
+  private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = {
+    val path = TopicPartitionZNode.path(topicPartition)
+    CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def createTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): CreateRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def updateTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): SetDataRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
+  }
+
+  override def updateTopicPartitions(topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
+                                     state: MigrationRecoveryState): MigrationRecoveryState = {
+    val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) =>
+      partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) =>
+        val topicPartition = new TopicPartition(topicName, partitionId)
+        Seq(updateTopicPartitionState(topicPartition, partitionRegistration, state.kraftControllerEpoch()))
+      }
+    }
+    if (requests.isEmpty) {
+      state
+    } else {
+      val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state.controllerZkVersion(), state)
+      responses.foreach(System.err.println)
+      state.withZkVersion(migrationZkVersion)
+    }
+  }
+
+  override def createKRaftBroker(brokerId: Int, brokerRegistration: BrokerRegistration, state: MigrationRecoveryState): MigrationRecoveryState = {
+    val brokerInfo = BrokerInfo(
+      Broker(
+        id = brokerId,
+        endPoints = brokerRegistration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
+        rack = brokerRegistration.rack().toScala),
+      MetadataVersion.latest(), // TODO ???
+      -1
+    )
+    val req = CreateRequest(brokerInfo.path, brokerInfo.toJsonBytes, zkClient.defaultAcls(brokerInfo.path), CreateMode.PERSISTENT)
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(req), state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }

Review Comment:
   Why are we registering KRaft broker in Zookeeper?



##########
metadata/src/main/java/org/apache/kafka/migration/MigrationState.java:
##########
@@ -0,0 +1,20 @@
+package org.apache.kafka.migration;
+
+public enum MigrationState {
+    UNINITIALIZED(false),   // Initial state
+    INACTIVE(false),        // State when not the active controller
+    NEW_LEADER(false),      // State after KRaft leader election and before ZK leadership claim

Review Comment:
   NEW_LEADER --> BECOME_LEADER?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah closed pull request #12815: KIP-866 Part 1

Posted by GitBox <gi...@apache.org>.
mumrah closed pull request #12815: KIP-866 Part 1
URL: https://github.com/apache/kafka/pull/12815


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12815: KIP-866 Part 1

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12815:
URL: https://github.com/apache/kafka/pull/12815#discussion_r1025397641


##########
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##########
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse}
+import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid}
+import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, VersionRange}
+import org.apache.kafka.migration._
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.zookeeper.CreateMode
+
+import java.util
+import java.util.function.Consumer
+import java.util.{Collections, Optional}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
+
+object ZkMigrationClient {
+  def brokerToBrokerRegistration(broker: Broker, epoch: Long): ZkBrokerRegistration = {
+      val registration = new BrokerRegistration(broker.id, epoch, Uuid.ZERO_UUID,
+        Collections.emptyList[Endpoint], Collections.emptyMap[String, VersionRange],
+        Optional.empty(), false, false)
+      new ZkBrokerRegistration(registration, null, null, false)
+  }
+}
+
+class ZkMigrationClient(zkClient: KafkaZkClient,
+                        controllerChannelManager: ControllerChannelManager) extends MigrationClient with Logging {
+
+  def claimControllerLeadership(kraftControllerId: Int, kraftControllerEpoch: Int): ZkControllerState = {
+    val epochZkVersionOpt = zkClient.tryRegisterKRaftControllerAsActiveController(kraftControllerId, kraftControllerEpoch)
+    if (epochZkVersionOpt.isDefined) {
+      new ZkControllerState(kraftControllerId, kraftControllerEpoch, epochZkVersionOpt.get)
+    } else {
+      throw new ControllerMovedException("Cannot claim controller leadership, the controller has moved.")
+    }
+  }
+
+  def migrateTopics(metadataVersion: MetadataVersion,
+                    recordConsumer: Consumer[util.List[ApiMessageAndVersion]],
+                    brokerIdConsumer: Consumer[Integer]): Unit = {
+    val topics = zkClient.getAllTopicsInCluster()
+    val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
+    val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
+    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, assignments) =>
+      val partitions = assignments.keys.toSeq
+      val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
+      val topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+      topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+        .setName(topic)
+        .setTopicId(topicIdOpt.get), TopicRecord.HIGHEST_SUPPORTED_VERSION))
+
+      assignments.foreach { case (topicPartition, replicaAssignment) =>
+        replicaAssignment.replicas.foreach(brokerIdConsumer.accept(_))
+        replicaAssignment.addingReplicas.foreach(brokerIdConsumer.accept(_))
+
+        val leaderIsrAndEpoch = leaderIsrAndControllerEpochs(topicPartition)
+        topicBatch.add(new ApiMessageAndVersion(new PartitionRecord()
+          .setTopicId(topicIdOpt.get)
+          .setPartitionId(topicPartition.partition)
+          .setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava)
+          .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
+          .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
+          .setIsr(leaderIsrAndEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava)
+          .setLeader(leaderIsrAndEpoch.leaderAndIsr.leader)
+          .setLeaderEpoch(leaderIsrAndEpoch.leaderAndIsr.leaderEpoch)
+          .setPartitionEpoch(leaderIsrAndEpoch.leaderAndIsr.partitionEpoch)
+          .setLeaderRecoveryState(leaderIsrAndEpoch.leaderAndIsr.leaderRecoveryState.value()), PartitionRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      val props = topicConfigs(topic)
+      props.forEach { case (key: Object, value: Object) =>
+        topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.TOPIC.id)
+          .setResourceName(topic)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+
+      recordConsumer.accept(topicBatch)
+    }
+  }
+
+  def migrateBrokerConfigs(metadataVersion: MetadataVersion,
+                           recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker)
+    val batch = new util.ArrayList[ApiMessageAndVersion]()
+    zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) =>
+      val brokerResource = if (broker == ConfigEntityName.Default) {
+        ""
+      } else {
+        broker
+      }
+      props.forEach { case (key: Object, value: Object) =>
+        batch.add(new ApiMessageAndVersion(new ConfigRecord()
+          .setResourceType(ConfigResource.Type.BROKER.id)
+          .setResourceName(brokerResource)
+          .setName(key.toString)
+          .setValue(value.toString), ConfigRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+    }
+    recordConsumer.accept(batch)
+  }
+
+  def migrateClientQuotas(metadataVersion: MetadataVersion,
+                          recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val adminZkClient = new AdminZkClient(zkClient)
+
+    def migrateEntityType(entityType: String): Unit = {
+      adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) =>
+        val entity = new EntityData().setEntityType(entityType).setEntityName(name)
+        val batch = new util.ArrayList[ApiMessageAndVersion]()
+        ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+          batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+            .setEntity(List(entity).asJava)
+            .setKey(key)
+            .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+        }
+        recordConsumer.accept(batch)
+      }
+    }
+
+    migrateEntityType(ConfigType.User)
+    migrateEntityType(ConfigType.Client)
+    adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) =>
+      // Lifted from ZkAdminManager
+      val components = name.split("/")
+      if (components.size != 3 || components(1) != "clients")
+        throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+      val entity = List(
+        new EntityData().setEntityType(ConfigType.User).setEntityName(components(0)),
+        new EntityData().setEntityType(ConfigType.Client).setEntityName(components(2))
+      )
+
+      val batch = new util.ArrayList[ApiMessageAndVersion]()
+      ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) =>
+        batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
+          .setEntity(entity.asJava)
+          .setKey(key)
+          .setValue(value), ClientQuotaRecord.HIGHEST_SUPPORTED_VERSION))
+      }
+      recordConsumer.accept(batch)
+    }
+
+    migrateEntityType(ConfigType.Ip)
+  }
+
+  def migrateProducerId(metadataVersion: MetadataVersion,
+                        recordConsumer: Consumer[util.List[ApiMessageAndVersion]]): Unit = {
+    val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+    dataOpt match {
+      case Some(data) =>
+        val producerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+        recordConsumer.accept(List(new ApiMessageAndVersion(new ProducerIdsRecord()
+          .setBrokerEpoch(-1)
+          .setBrokerId(producerIdBlock.assignedBrokerId)
+          .setNextProducerId(producerIdBlock.firstProducerId), ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+      case None => // Nothing to migrate
+    }
+  }
+
+  override def readAllMetadata(batchConsumer: Consumer[util.List[ApiMessageAndVersion]], brokerIdConsumer: Consumer[Integer]): Unit = {
+    migrateTopics(MetadataVersion.latest(), batchConsumer, brokerIdConsumer)
+    migrateBrokerConfigs(MetadataVersion.latest(), batchConsumer)
+    migrateClientQuotas(MetadataVersion.latest(), batchConsumer)
+    migrateProducerId(MetadataVersion.latest(), batchConsumer)
+  }
+
+  override def watchZkBrokerRegistrations(listener: MigrationClient.BrokerRegistrationListener): Unit = {
+    val brokersHandler = new ZNodeChildChangeHandler() {
+      override val path: String = BrokerIdsZNode.path
+
+      override def handleChildChange(): Unit = listener.onBrokersChange()
+    }
+    System.err.println("Adding /brokers watch")
+    zkClient.registerZNodeChildChangeHandler(brokersHandler)
+
+    def brokerHandler(brokerId: Int): ZNodeChangeHandler = {
+      new ZNodeChangeHandler() {
+        override val path: String = BrokerIdZNode.path(brokerId)
+
+        override def handleDataChange(): Unit = listener.onBrokerChange(brokerId)
+      }
+    }
+
+    val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster()
+    curBrokerAndEpochs.foreach { case (broker, _) =>
+      System.err.println(s"Adding /brokers/${broker.id} watch")
+      zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerHandler(broker.id))
+    }
+
+    listener.onBrokersChange()
+  }
+
+  override def readBrokerRegistration(brokerId: Int): Optional[ZkBrokerRegistration] = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    if (brokerAndEpoch.isEmpty) {
+      Optional.empty()
+    } else {
+      Optional.of(brokerToBrokerRegistration(brokerAndEpoch.head._1, brokerAndEpoch.head._2))
+    }
+  }
+
+  override def readBrokerIds(): util.Set[Integer] = {
+    zkClient.getSortedBrokerList.map(Integer.valueOf).toSet.asJava
+  }
+
+  override def addZkBroker(brokerId: Int): Unit = {
+    val brokerAndEpoch = zkClient.getAllBrokerAndEpochsInCluster(Seq(brokerId))
+    controllerChannelManager.addBroker(brokerAndEpoch.head._1)
+  }
+
+  override def removeZkBroker(brokerId: Int): Unit = {
+    controllerChannelManager.removeBroker(brokerId)
+  }
+
+  override def getOrCreateMigrationRecoveryState(initialState: MigrationRecoveryState): MigrationRecoveryState = {
+    zkClient.getOrCreateMigrationState(initialState)
+  }
+
+  override def setMigrationRecoveryState(state: MigrationRecoveryState): MigrationRecoveryState = {
+    zkClient.updateMigrationState(state)
+  }
+
+  override def sendRequestToBroker(brokerId: Int,
+                                   request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
+                                   callback: Consumer[AbstractResponse]): Unit = {
+    controllerChannelManager.sendRequest(brokerId, request, callback.accept)
+  }
+
+  override def createTopic(topicName: String, topicId: Uuid, partitions: util.Map[Integer, PartitionRegistration], state: MigrationRecoveryState): MigrationRecoveryState = {
+    val assignments = partitions.asScala.map { case (partitionId, partition) =>
+      new TopicPartition(topicName, partitionId) -> ReplicaAssignment(partition.replicas, partition.addingReplicas, partition.removingReplicas)
+    }
+
+    val createTopicZNode = {
+      val path = TopicZNode.path(topicName)
+      CreateRequest(
+        path,
+        TopicZNode.encode(Some(topicId), assignments),
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+    val createPartitionsZNode = {
+      val path = TopicPartitionsZNode.path(topicName)
+      CreateRequest(
+        path,
+        null,
+        zkClient.defaultAcls(path),
+        CreateMode.PERSISTENT)
+    }
+
+    val createPartitionZNodeReqs = partitions.asScala.flatMap { case (partitionId, partition) =>
+      val topicPartition = new TopicPartition(topicName, partitionId)
+      Seq(
+        createTopicPartition(topicPartition),
+        createTopicPartitionState(topicPartition, partition, state.kraftControllerEpoch())
+      )
+    }
+
+    val requests = Seq(createTopicZNode, createPartitionsZNode) ++ createPartitionZNodeReqs
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }
+
+  private def createTopicPartition(topicPartition: TopicPartition): CreateRequest = {
+    val path = TopicPartitionZNode.path(topicPartition)
+    CreateRequest(path, null, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def createTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): CreateRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    CreateRequest(path, data, zkClient.defaultAcls(path), CreateMode.PERSISTENT, Some(topicPartition))
+  }
+
+  private def updateTopicPartitionState(topicPartition: TopicPartition, partitionRegistration: PartitionRegistration, controllerEpoch: Int): SetDataRequest = {
+    val path = TopicPartitionStateZNode.path(topicPartition)
+    val data = TopicPartitionStateZNode.encode(LeaderIsrAndControllerEpoch(new LeaderAndIsr(
+      partitionRegistration.leader,
+      partitionRegistration.leaderEpoch,
+      partitionRegistration.isr.toList,
+      partitionRegistration.leaderRecoveryState,
+      partitionRegistration.partitionEpoch), controllerEpoch))
+    SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition))
+  }
+
+  override def updateTopicPartitions(topicPartitions: util.Map[String, util.Map[Integer, PartitionRegistration]],
+                                     state: MigrationRecoveryState): MigrationRecoveryState = {
+    val requests = topicPartitions.asScala.flatMap { case (topicName, partitionRegistrations) =>
+      partitionRegistrations.asScala.flatMap { case (partitionId, partitionRegistration) =>
+        val topicPartition = new TopicPartition(topicName, partitionId)
+        Seq(updateTopicPartitionState(topicPartition, partitionRegistration, state.kraftControllerEpoch()))
+      }
+    }
+    if (requests.isEmpty) {
+      state
+    } else {
+      val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests.toSeq, state.controllerZkVersion(), state)
+      responses.foreach(System.err.println)
+      state.withZkVersion(migrationZkVersion)
+    }
+  }
+
+  override def createKRaftBroker(brokerId: Int, brokerRegistration: BrokerRegistration, state: MigrationRecoveryState): MigrationRecoveryState = {
+    val brokerInfo = BrokerInfo(
+      Broker(
+        id = brokerId,
+        endPoints = brokerRegistration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
+        rack = brokerRegistration.rack().toScala),
+      MetadataVersion.latest(), // TODO ???
+      -1
+    )
+    val req = CreateRequest(brokerInfo.path, brokerInfo.toJsonBytes, zkClient.defaultAcls(brokerInfo.path), CreateMode.PERSISTENT)
+    val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(Seq(req), state.controllerZkVersion(), state)
+    responses.foreach(System.err.println)
+    state.withZkVersion(migrationZkVersion)
+  }

Review Comment:
   This was part of an earlier design when broker registrations were bidirectional. I'll remove this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org