You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/13 21:15:43 UTC

[kafka] branch trunk updated: KAFKA-14448 Let ZK brokers register with KRaft controller (#12965)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 67c72596afe KAFKA-14448 Let ZK brokers register with KRaft controller (#12965)
67c72596afe is described below

commit 67c72596afe58363eceeb32084c5c04637a33831
Author: David Arthur <mu...@gmail.com>
AuthorDate: Tue Dec 13 16:15:21 2022 -0500

    KAFKA-14448 Let ZK brokers register with KRaft controller (#12965)
    
    Prior to starting a KIP-866 migration, the ZK brokers must register themselves with the active
    KRaft controller. The controller waits for all brokers to register in order to verify that all the
    brokers can
    
    A) Communicate with the quorum
    B) Have the migration config enabled
    C) Have the proper IBP set
    
    This patch uses the new isMigratingZkBroker field in BrokerRegistrationRequest and
    RegisterBrokerRecord. The type was changed from int8 to bool for BrokerRegistrationRequest (a
    mistake from #12860). The ZK brokers use the existing BrokerLifecycleManager class to register and
    heartbeat with the controllers.
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Colin P. McCabe <cm...@apache.org>
---
 .../common/requests/BrokerRegistrationRequest.java |   9 +
 .../common/message/BrokerRegistrationRequest.json  |   4 +-
 .../common/message/BrokerRegistrationResponse.json |   2 +-
 .../kafka/server/BrokerLifecycleManager.scala      |  20 ++-
 .../server/BrokerToControllerChannelManager.scala  |   4 +-
 .../main/scala/kafka/server/ControllerServer.scala |   3 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  11 +-
 core/src/main/scala/kafka/server/KafkaServer.scala | 111 ++++++++++--
 .../server/metadata/OffsetTrackingListener.scala   |  48 +++++
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  19 +-
 core/src/test/java/kafka/test/ClusterInstance.java |   2 +
 .../test/junit/RaftClusterInvocationContext.java   |   8 +
 .../test/junit/ZkClusterInvocationContext.java     |   8 +-
 .../server/KafkaServerKRaftRegistrationTest.scala  |  85 +++++++++
 .../server/BrokerRegistrationRequestTest.scala     | 197 +++++++++++++++++++++
 .../kafka/controller/ClusterControlManager.java    |  33 +++-
 .../apache/kafka/controller/QuorumController.java  |  13 +-
 .../apache/kafka/metadata/BrokerRegistration.java  |  55 +++++-
 .../kafka/metadata/BrokerRegistrationTest.java     |   4 +-
 .../kafka/server/common/MetadataVersion.java       |   9 +-
 .../kafka/server/common/MetadataVersionTest.java   |  10 +-
 21 files changed, 608 insertions(+), 47 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
index 2ba1529e72a..18d6a070d05 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
@@ -34,6 +34,15 @@ public class BrokerRegistrationRequest extends AbstractRequest {
             this.data = data;
         }
 
+        @Override
+        public short oldestAllowedVersion() {
+            if (data.isMigratingZkBroker()) {
+                return (short) 1;
+            } else {
+                return (short) 0;
+            }
+        }
+
         @Override
         public BrokerRegistrationRequest build(short version) {
             return new BrokerRegistrationRequest(data, version);
diff --git a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index c29e190dd01..19ad8f249b3 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -18,7 +18,7 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "BrokerRegistrationRequest",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@@ -51,7 +51,7 @@
     },
     { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
       "about": "The rack which this broker is in." },
-    { "name": "IsMigratingZkBroker", "type": "int8", "versions": "0+", "taggedVersions": "0+", "tag": 0, "ignorable": true,
+    { "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
       "about": "Set by a ZK broker if the required configurations for ZK migration are present." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
index 1f12123b04b..7515d5ee4bc 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
@@ -17,7 +17,7 @@
   "apiKey": 62,
   "type": "response",
   "name": "BrokerRegistrationResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 39dff71ad11..9f361275d4e 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -51,9 +51,13 @@ import scala.jdk.CollectionConverters._
  * In some cases we expose a volatile variable which can be read from any thread, but only
  * written from the event queue thread.
  */
-class BrokerLifecycleManager(val config: KafkaConfig,
-                             val time: Time,
-                             val threadNamePrefix: Option[String]) extends Logging {
+class BrokerLifecycleManager(
+  val config: KafkaConfig,
+  val time: Time,
+  val threadNamePrefix: Option[String],
+  val isZkBroker: Boolean = false
+) extends Logging {
+
   val logContext = new LogContext(s"[BrokerLifecycleManager id=${config.nodeId}] ")
 
   this.logIdent = logContext.logPrefix()
@@ -266,9 +270,12 @@ class BrokerLifecycleManager(val config: KafkaConfig,
       _clusterId = clusterId
       _advertisedListeners = advertisedListeners.duplicate()
       _supportedFeatures = new util.HashMap[String, VersionRange](supportedFeatures)
-      eventQueue.scheduleDeferred("initialRegistrationTimeout",
-        new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
-        new RegistrationTimeoutEvent())
+      if (!isZkBroker) {
+        // ZK brokers don't block on registration during startup
+        eventQueue.scheduleDeferred("initialRegistrationTimeout",
+          new DeadlineFunction(time.nanoseconds() + initialTimeoutNs),
+          new RegistrationTimeoutEvent())
+      }
       sendBrokerRegistration()
       info(s"Incarnation $incarnationId of broker $nodeId in cluster $clusterId " +
         "is now STARTING.")
@@ -285,6 +292,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
     }
     val data = new BrokerRegistrationRequestData().
         setBrokerId(nodeId).
+        setIsMigratingZkBroker(isZkBroker).
         setClusterId(_clusterId).
         setFeatures(features).
         setIncarnationId(incarnationId).
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 92754a793f5..3d1e5d3f63c 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -367,7 +367,7 @@ class BrokerToControllerRequestThread(
     if (activeControllerAddress().isDefined) {
       super.pollOnce(Long.MaxValue)
     } else {
-      debug("Controller isn't cached, looking for local metadata changes")
+      debug("Controller isn't known, checking with controller provider")
       controllerNodeProvider.get() match {
         case Some(controllerNode) =>
           info(s"Recorded new controller, from now on will use node $controllerNode")
@@ -375,7 +375,7 @@ class BrokerToControllerRequestThread(
           metadataUpdater.setNodes(Seq(controllerNode).asJava)
         case None =>
           // need to backoff to avoid tight loops
-          debug("No controller defined in metadata cache, retrying after backoff")
+          debug("No controller provided, retrying after backoff")
           super.pollOnce(maxTimeoutMs = 100)
       }
     }
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 2bd518cde2a..03b86b7067e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -204,7 +204,8 @@ class ControllerServer(
           setConfigurationValidator(new ControllerConfigurationValidator()).
           setStaticConfig(config.originals).
           setBootstrapMetadata(bootstrapMetadata).
-          setFatalFaultHandler(sharedServer.quorumControllerFaultHandler)
+          setFatalFaultHandler(sharedServer.quorumControllerFaultHandler).
+          setZkMigrationEnabled(config.migrationEnabled)
       }
       authorizer match {
         case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 43ac98f53c8..7be37ee9e71 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2230,8 +2230,15 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       validateAdvertisedListenersNonEmptyForBroker()
     } else {
       // ZK-based
-      // controller listener names must be empty when not in KRaft mode
-      require(controllerListenerNames.isEmpty, s"${KafkaConfig.ControllerListenerNamesProp} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
+      if (migrationEnabled) {
+        validateNonEmptyQuorumVotersForKRaft()
+        require(controllerListenerNames.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must not be empty when running in ZK migration mode: ${controllerListenerNames.asJava}")
+      } else {
+        // controller listener names must be empty when not in KRaft mode
+        require(controllerListenerNames.isEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
+      }
       validateAdvertisedListenersNonEmptyForBroker()
     }
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index bc13a0986e9..79a621c6b54 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -21,7 +21,6 @@ import java.io.{File, IOException}
 import java.net.{InetAddress, SocketTimeoutException}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
-
 import kafka.cluster.{Broker, EndPoint}
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException}
 import kafka.controller.KafkaController
@@ -30,13 +29,15 @@ import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
 import kafka.log.LogManager
 import kafka.metrics.KafkaMetricsReporter
 import kafka.network.{ControlPlaneAcceptor, DataPlaneAcceptor, RequestChannel, SocketServer}
+import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
-import kafka.server.metadata.{ZkConfigRepository, ZkMetadataCache}
+import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, ZkMetadataCache}
 import kafka.utils._
 import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
 import org.apache.kafka.common.message.ControlledShutdownRequestData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
@@ -47,8 +48,10 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
 import org.apache.kafka.common.{Endpoint, Node}
-import org.apache.kafka.metadata.BrokerState
+import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange}
+import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.zookeeper.client.ZKClientConfig
@@ -181,6 +184,8 @@ class KafkaServer(
 
   def kafkaController: KafkaController = _kafkaController
 
+  var lifecycleManager: BrokerLifecycleManager = _
+
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
@@ -199,6 +204,11 @@ class KafkaServer(
       if (canStartup) {
         _brokerState = BrokerState.STARTING
 
+        lifecycleManager = new BrokerLifecycleManager(config,
+          time,
+          threadNamePrefix,
+          isZkBroker = true)
+
         /* setup zookeeper */
         initZkClient(time)
         configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))
@@ -335,7 +345,65 @@ class KafkaServer(
         val brokerEpoch = zkClient.registerBroker(brokerInfo)
 
         // Now that the broker is successfully registered, checkpoint its metadata
-        checkpointBrokerMetadata(ZkMetaProperties(clusterId, config.brokerId))
+        val zkMetaProperties = ZkMetaProperties(clusterId, config.brokerId)
+        checkpointBrokerMetadata(zkMetaProperties)
+
+        if (config.migrationEnabled) {
+          // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
+          val kraftMetaProps = MetaProperties(zkMetaProperties.clusterId, zkMetaProperties.brokerId)
+          val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
+            RaftConfig.parseVoterConnections(config.quorumVoters))
+          val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
+            kraftMetaProps,
+            config,
+            new MetadataRecordSerde,
+            KafkaRaftServer.MetadataPartition,
+            KafkaRaftServer.MetadataTopicId,
+            time,
+            metrics,
+            threadNamePrefix,
+            controllerQuorumVotersFuture
+          )
+          val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
+          val quorumControllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
+          val brokerToQuorumChannelManager = BrokerToControllerChannelManager(
+            controllerNodeProvider = quorumControllerNodeProvider,
+            time = time,
+            metrics = metrics,
+            config = config,
+            channelName = "quorum",
+            threadNamePrefix = threadNamePrefix,
+            retryTimeoutMs = config.requestTimeoutMs.longValue
+          )
+
+          val listener = new OffsetTrackingListener()
+          raftManager.register(listener)
+
+          val networkListeners = new ListenerCollection()
+          config.effectiveAdvertisedListeners.foreach { ep =>
+            networkListeners.add(new Listener().
+              setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
+              setName(ep.listenerName.value()).
+              setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
+              setSecurityProtocol(ep.securityProtocol.id))
+          }
+
+          // Even though ZK brokers don't use "metadata.version" feature, we send our IBP here as part of the broker registration
+          // so the KRaft controller can verify that all brokers are on the same IBP before starting the migration.
+          val ibpAsFeature =
+           java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+             VersionRange.of(config.interBrokerProtocolVersion.featureLevel(), config.interBrokerProtocolVersion.featureLevel()))
+
+          lifecycleManager.start(
+            () => listener.highestOffset,
+            brokerToQuorumChannelManager,
+            kraftMetaProps.clusterId,
+            networkListeners,
+            ibpAsFeature
+          )
+
+          raftManager.startup()
+        }
 
         /* start token manager */
         tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
@@ -453,6 +521,17 @@ class KafkaServer(
         dynamicConfigManager = new ZkConfigManager(zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
+        if (config.migrationEnabled && lifecycleManager != null) {
+          lifecycleManager.initialCatchUpFuture.whenComplete { case (_, t) =>
+            if (t != null) {
+              fatal("Encountered an exception when waiting to catch up with KRaft metadata log", t)
+              shutdown()
+            } else {
+              info("Finished catching up on KRaft metadata log, requesting that the KRaft controller unfence this broker")
+              lifecycleManager.setReadyToUnfence()
+            }
+          }
+        }
         socketServer.enableRequestProcessing(authorizerFutures)
 
         _brokerState = BrokerState.RUNNING
@@ -491,17 +570,7 @@ class KafkaServer(
 
   private def initZkClient(time: Time): Unit = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
-
-    val secureAclsEnabled = config.zkEnableSecureAcls
-    val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled() || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig)
-
-    if (secureAclsEnabled && !isZkSecurityEnabled)
-      throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but ZooKeeper client TLS configuration identifying at least $KafkaConfig.ZkSslClientEnableProp, $KafkaConfig.ZkClientCnxnSocketProp, and $KafkaConfig.ZkSslKeyStoreLocationProp was not present and the " +
-        s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}")
-
-    _zkClient = KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-      config.zkMaxInFlightRequests, time, name = "Kafka server", zkClientConfig = zkClientConfig,
-      createChrootIfNecessary = true)
+    _zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)
     _zkClient.createTopLevelPaths()
   }
 
@@ -532,6 +601,7 @@ class KafkaServer(
     )
 
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+
     BrokerInfo(
       Broker(config.brokerId, updatedEndpoints, config.rack, brokerFeatures.supportedFeatures),
       config.interBrokerProtocolVersion,
@@ -690,6 +760,14 @@ class KafkaServer(
 
       _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN
 
+      if (config.migrationEnabled && lifecycleManager != null) {
+        // TODO KAFKA-14447 Only use KRaft controlled shutdown (when in migration mode)
+        // For now we'll send the heartbeat with WantShutDown set so the KRaft controller can see a broker
+        // shutting down without waiting for the heartbeat to time out.
+        info("Notifying KRaft of controlled shutdown")
+        lifecycleManager.beginControlledShutdown()
+      }
+
       val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
 
       if (!shutdownSucceeded)
@@ -793,6 +871,9 @@ class KafkaServer(
         // Clear all reconfigurable instances stored in DynamicBrokerConfig
         config.dynamicConfig.clear()
 
+        if (lifecycleManager != null) {
+          lifecycleManager.close()
+        }
         _brokerState = BrokerState.NOT_RUNNING
 
         startupComplete.set(false)
diff --git a/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala b/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala
new file mode 100644
index 00000000000..056b6f0fec1
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/OffsetTrackingListener.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import org.apache.kafka.raft.{BatchReader, RaftClient}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.SnapshotReader
+
+/**
+ *  A simple Raft listener that only keeps track of the highest offset seen. Used for registration of ZK
+ *  brokers with the KRaft controller during a KIP-866 migration.
+ */
+class OffsetTrackingListener extends RaftClient.Listener[ApiMessageAndVersion] {
+  @volatile var _highestOffset = 0L
+
+  def highestOffset: Long = _highestOffset
+
+  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = {
+    reader.lastOffset()
+    var index = 0
+    while (reader.hasNext) {
+      index += 1
+      reader.next()
+    }
+    _highestOffset = reader.lastOffset().orElse(reader.baseOffset() + index)
+    reader.close()
+  }
+
+  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
+    _highestOffset = reader.lastContainedLogOffset()
+    reader.close()
+  }
+}
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 12f4bfb2c3e..115446572e1 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -25,12 +25,13 @@ import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.security.authorizer.AclAuthorizer.{NoAcls, VersionedAcls}
 import kafka.security.authorizer.AclEntry
-import kafka.server.ConfigType
+import kafka.server.{ConfigType, KafkaConfig}
 import kafka.utils.Logging
 import kafka.zk.TopicZNode.TopicIdReplicaAssignment
 import kafka.zookeeper._
 import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
@@ -2312,4 +2313,20 @@ object KafkaZkClient {
       case _ => throw new IllegalStateException(s"Cannot unwrap $response because it is not a MultiResponse")
     }
   }
+
+  def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {
+    val secureAclsEnabled = config.zkEnableSecureAcls
+    val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig)
+
+    if (secureAclsEnabled && !isZkSecurityEnabled)
+      throw new java.lang.SecurityException(
+        s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but ZooKeeper client TLS configuration identifying at least " +
+          s"${KafkaConfig.ZkSslClientEnableProp}, ${KafkaConfig.ZkClientCnxnSocketProp}, and " +
+          s"${KafkaConfig.ZkSslKeyStoreLocationProp} was not present and the verification of the JAAS login file failed " +
+          s"${JaasUtils.zkSecuritySysConfigString}")
+
+    KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+      config.zkMaxInFlightRequests, time, name = name, zkClientConfig = zkClientConfig,
+      createChrootIfNecessary = true)
+  }
 }
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index 9058508fa94..f149c82e7c3 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -115,6 +115,8 @@ public interface ClusterInstance {
      */
     Map<Integer, BrokerFeatures> brokerFeatures();
 
+    String clusterId();
+
     /**
      * The underlying object which is responsible for setting up and tearing down the cluster.
      */
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index f5c281ff249..f7eb0a50179 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -183,6 +183,14 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
             ));
         }
 
+        @Override
+        public String clusterId() {
+            return controllers().findFirst().map(ControllerServer::clusterId).orElse(
+                brokers().findFirst().map(BrokerServer::clusterId).orElseThrow(
+                    () -> new RuntimeException("No controllers or brokers!"))
+            );
+        }
+
         public Collection<ControllerServer> controllerServers() {
             return controllers().collect(Collectors.toList());
         }
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 18a85e2d7bf..e77ef4a3ce8 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -237,7 +237,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
                 .filter(broker -> broker.kafkaController().isActive())
                 .map(KafkaServer::socketServer)
                 .findFirst()
-                .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
+                .orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
         }
 
         @Override
@@ -248,6 +248,12 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
             ));
         }
 
+        @Override
+        public String clusterId() {
+            return servers().findFirst().map(KafkaServer::clusterId).orElseThrow(
+                () -> new RuntimeException("No broker instances found"));
+        }
+
         @Override
         public ClusterType clusterType() {
             return ClusterType.ZK;
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
new file mode 100644
index 00000000000..8b85a8f81cc
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.common.MetadataVersion
+import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Tag, Timeout}
+
+import java.util.concurrent.{TimeUnit, TimeoutException}
+import scala.jdk.CollectionConverters._
+
+
+/**
+ * This test creates a full ZK cluster and a controller-only KRaft cluster and configures the ZK brokers to register
+ * themselves with the KRaft controller. This is mainly a happy-path test since the only way to reliably test the
+ * failure paths is to use timeouts. See {@link unit.kafka.server.BrokerRegistrationRequestTest} for integration test
+ * of just the broker registration path.
+ */
+@Timeout(120)
+@Tag("integration")
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class KafkaServerKRaftRegistrationTest {
+
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0)
+  def testRegisterZkBrokerInKraft1(zkCluster: ClusterInstance): Unit = {
+    val clusterId = zkCluster.clusterId()
+
+    // Bootstrap the ZK cluster ID into KRaft
+    val kraftCluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+        setClusterId(Uuid.fromString(clusterId)).
+        setNumBrokerNodes(0).
+        setNumControllerNodes(1).build())
+      .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+      .build()
+    try {
+      kraftCluster.format()
+      kraftCluster.startup()
+      val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+      // Enable migration configs and restart brokers
+      val props = kraftCluster.controllerClientProperties()
+      val voters = props.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+      zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
+      zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+      zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+      zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      zkCluster.rollingBrokerRestart()
+      zkCluster.waitForReadyBrokers()
+
+      try {
+        // Wait until all three ZK brokers are registered with KRaft controller
+        readyFuture.get(30, TimeUnit.SECONDS)
+      } catch {
+        case _: TimeoutException => fail("Did not see 3 brokers within 30 seconds")
+        case t: Throwable => fail("Had some other error waiting for brokers", t)
+      }
+    } finally {
+      kraftCluster.close()
+    }
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
new file mode 100644
index 00000000000..2bb1314a789
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+import kafka.server.{BrokerToControllerChannelManager, ControllerNodeProvider, ControllerRequestCompletionHandler}
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.{BrokerRegistrationRequestData, BrokerRegistrationResponseData}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{BrokerRegistrationRequest, BrokerRegistrationResponse}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.server.common.MetadataVersion
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Tag, Timeout}
+
+import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
+
+/**
+ * This test simulates a broker registering with the KRaft quorum under different configurations.
+ */
+@Timeout(120)
+@Tag("integration")
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class BrokerRegistrationRequestTest {
+
+  def brokerToControllerChannelManager(clusterInstance: ClusterInstance): BrokerToControllerChannelManager = {
+    BrokerToControllerChannelManager(
+      new ControllerNodeProvider() {
+        override def get(): Option[Node] = Some(new Node(
+          clusterInstance.anyControllerSocketServer().config.nodeId,
+          "127.0.0.1",
+          clusterInstance.anyControllerSocketServer().boundPort(clusterInstance.controllerListenerName().get()),
+        ))
+
+        override def listenerName: ListenerName = clusterInstance.controllerListenerName().get()
+
+        override def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
+
+        override def saslMechanism: String = ""
+      },
+      Time.SYSTEM,
+      new Metrics(),
+      clusterInstance.anyControllerSocketServer().config,
+      "heartbeat",
+      Some("heartbeat"),
+      10000
+    )
+  }
+
+  def sendAndRecieve(
+    channelManager: BrokerToControllerChannelManager,
+    req: BrokerRegistrationRequestData
+  ): BrokerRegistrationResponseData = {
+    val responseFuture = new CompletableFuture[BrokerRegistrationResponseData]()
+    channelManager.sendRequest(new BrokerRegistrationRequest.Builder(req), new ControllerRequestCompletionHandler() {
+      override def onTimeout(): Unit = responseFuture.completeExceptionally(new TimeoutException())
+
+      override def onComplete(response: ClientResponse): Unit =
+        responseFuture.complete(response.responseBody().asInstanceOf[BrokerRegistrationResponse].data())
+    })
+    responseFuture.get(30, TimeUnit.SECONDS)
+  }
+
+  def registerBroker(
+    channelManager: BrokerToControllerChannelManager,
+    clusterId: String,
+    brokerId: Int,
+    zk: Boolean,
+    ibpToSend: Option[(MetadataVersion, MetadataVersion)]
+  ): Errors = {
+    val features = new BrokerRegistrationRequestData.FeatureCollection()
+
+    ibpToSend foreach {
+      case (min, max) =>
+        features.add(new BrokerRegistrationRequestData.Feature()
+          .setName(MetadataVersion.FEATURE_NAME)
+          .setMinSupportedVersion(min.featureLevel())
+          .setMaxSupportedVersion(max.featureLevel())
+        )
+    }
+
+    val req = new BrokerRegistrationRequestData()
+      .setBrokerId(brokerId)
+      .setClusterId(clusterId)
+      .setIncarnationId(Uuid.randomUuid())
+      .setIsMigratingZkBroker(zk)
+      .setFeatures(features)
+
+    Errors.forCode(sendAndRecieve(channelManager, req).errorCode())
+  }
+
+  @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
+    serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
+  def testRegisterZkWithKRaftMigrationDisabled(clusterInstance: ClusterInstance): Unit = {
+    val clusterId = clusterInstance.clusterId()
+    val channelManager = brokerToControllerChannelManager(clusterInstance)
+    try {
+      channelManager.start()
+
+      assertEquals(
+        Errors.BROKER_ID_NOT_REGISTERED,
+        registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
+
+      assertEquals(
+        Errors.BROKER_ID_NOT_REGISTERED,
+        registerBroker(channelManager, clusterId, 100, true, None))
+
+      assertEquals(
+        Errors.BROKER_ID_NOT_REGISTERED,
+        registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+
+      assertEquals(
+        Errors.NONE,
+        registerBroker(channelManager, clusterId, 100, false, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+    } finally {
+      channelManager.shutdown()
+    }
+  }
+
+  @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3,
+    serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
+  def testRegisterZkWithKRaftOldMetadataVersion(clusterInstance: ClusterInstance): Unit = {
+    val clusterId = clusterInstance.clusterId()
+    val channelManager = brokerToControllerChannelManager(clusterInstance)
+    try {
+      channelManager.start()
+
+      assertEquals(
+        Errors.BROKER_ID_NOT_REGISTERED,
+        registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
+
+      assertEquals(
+        Errors.BROKER_ID_NOT_REGISTERED,
+        registerBroker(channelManager, clusterId, 100, true, None))
+
+      assertEquals(
+        Errors.BROKER_ID_NOT_REGISTERED,
+        registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+
+      assertEquals(
+        Errors.NONE,
+        registerBroker(channelManager, clusterId, 100, false, Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_4_IV0))))
+    } finally {
+      channelManager.shutdown()
+    }
+  }
+
+  @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
+    serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
+  def testRegisterZkWithKRaftMigrationEnabled(clusterInstance: ClusterInstance): Unit = {
+    val clusterId = clusterInstance.clusterId()
+    val channelManager = brokerToControllerChannelManager(clusterInstance)
+    try {
+      channelManager.start()
+
+      assertEquals(
+        Errors.NONE,
+        registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+
+      assertEquals(
+        Errors.UNSUPPORTED_VERSION,
+        registerBroker(channelManager, clusterId, 100, true, None))
+
+      assertEquals(
+        Errors.UNSUPPORTED_VERSION,
+        registerBroker(channelManager, clusterId, 100, true, Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_3_IV3))))
+
+      assertEquals(
+        Errors.NONE,
+        registerBroker(channelManager, clusterId, 100, false, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
+    } finally {
+      channelManager.shutdown()
+    }
+  }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 5bc5bd2c3af..95aae773fb4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
 import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
@@ -88,6 +89,7 @@ public class ClusterControlManager {
         private ReplicaPlacer replicaPlacer = null;
         private ControllerMetrics controllerMetrics = null;
         private FeatureControlManager featureControl = null;
+        private boolean zkMigrationEnabled = false;
 
         Builder setLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -129,6 +131,11 @@ public class ClusterControlManager {
             return this;
         }
 
+        Builder setZkMigrationEnabled(boolean zkMigrationEnabled) {
+            this.zkMigrationEnabled = zkMigrationEnabled;
+            return this;
+        }
+
         ClusterControlManager build() {
             if (logContext == null) {
                 logContext = new LogContext();
@@ -155,7 +162,8 @@ public class ClusterControlManager {
                 sessionTimeoutNs,
                 replicaPlacer,
                 controllerMetrics,
-                featureControl
+                featureControl,
+                zkMigrationEnabled
             );
         }
     }
@@ -247,6 +255,8 @@ public class ClusterControlManager {
      */
     private final FeatureControlManager featureControl;
 
+    private final boolean zkMigrationEnabled;
+
     private ClusterControlManager(
         LogContext logContext,
         String clusterId,
@@ -255,7 +265,8 @@ public class ClusterControlManager {
         long sessionTimeoutNs,
         ReplicaPlacer replicaPlacer,
         ControllerMetrics metrics,
-        FeatureControlManager featureControl
+        FeatureControlManager featureControl,
+        boolean zkMigrationEnabled
     ) {
         this.logContext = logContext;
         this.clusterId = clusterId;
@@ -269,6 +280,7 @@ public class ClusterControlManager {
         this.readyBrokersFuture = Optional.empty();
         this.controllerMetrics = metrics;
         this.featureControl = featureControl;
+        this.zkMigrationEnabled = zkMigrationEnabled;
     }
 
     ReplicaPlacer replicaPlacer() {
@@ -311,6 +323,10 @@ public class ClusterControlManager {
             .collect(Collectors.toSet());
     }
 
+    boolean zkRegistrationAllowed() {
+        return zkMigrationEnabled && featureControl.metadataVersion().isMigrationSupported();
+    }
+
     /**
      * Process an incoming broker registration request.
      */
@@ -341,7 +357,13 @@ public class ClusterControlManager {
             }
         }
 
-        RegisterBrokerRecord record = new RegisterBrokerRecord().setBrokerId(brokerId).
+        if (request.isMigratingZkBroker() && !zkRegistrationAllowed()) {
+            throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
+        }
+
+        RegisterBrokerRecord record = new RegisterBrokerRecord().
+            setBrokerId(brokerId).
+            setIsMigratingZkBroker(request.isMigratingZkBroker()).
             setIncarnationId(request.incarnationId()).
             setBrokerEpoch(brokerEpoch).
             setRack(request.rack());
@@ -423,7 +445,7 @@ public class ClusterControlManager {
                 new BrokerRegistration(brokerId, record.brokerEpoch(),
                     record.incarnationId(), listeners, features,
                     Optional.ofNullable(record.rack()), record.fenced(),
-                    record.inControlledShutdown()));
+                    record.inControlledShutdown(), record.isMigratingZkBroker()));
         updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
         if (heartbeatManager != null) {
             if (prevRegistration != null) heartbeatManager.remove(brokerId);
@@ -677,6 +699,9 @@ public class ClusterControlManager {
             if (metadataVersion.isInControlledShutdownStateSupported()) {
                 record.setInControlledShutdown(registration.inControlledShutdown());
             }
+            if (metadataVersion.isMigrationSupported()) {
+                record.setIsMigratingZkBroker(registration.isMigratingZkBroker());
+            }
             return singletonList(new ApiMessageAndVersion(record,
                 metadataVersion.registerBrokerRecordVersion()));
         }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3dba5b401d6..6716044bef5 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -181,6 +181,7 @@ public final class QuorumController implements Controller {
         private Map<String, Object> staticConfig = Collections.emptyMap();
         private BootstrapMetadata bootstrapMetadata = null;
         private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
+        private boolean zkMigrationEnabled = false;
 
         public Builder(int nodeId, String clusterId) {
             this.nodeId = nodeId;
@@ -306,6 +307,11 @@ public final class QuorumController implements Controller {
             return this;
         }
 
+        public Builder setZkMigrationEnabled(boolean zkMigrationEnabled) {
+            this.zkMigrationEnabled = zkMigrationEnabled;
+            return this;
+        }
+
         @SuppressWarnings("unchecked")
         public QuorumController build() throws Exception {
             if (raftClient == null) {
@@ -357,7 +363,8 @@ public final class QuorumController implements Controller {
                     authorizer,
                     staticConfig,
                     bootstrapMetadata,
-                    maxRecordsPerBatch
+                    maxRecordsPerBatch,
+                    zkMigrationEnabled
                 );
             } catch (Exception e) {
                 Utils.closeQuietly(queue, "event queue");
@@ -1848,7 +1855,8 @@ public final class QuorumController implements Controller {
         Optional<ClusterMetadataAuthorizer> authorizer,
         Map<String, Object> staticConfig,
         BootstrapMetadata bootstrapMetadata,
-        int maxRecordsPerBatch
+        int maxRecordsPerBatch,
+        boolean zkMigrationEnabled
     ) {
         this.fatalFaultHandler = fatalFaultHandler;
         this.logContext = logContext;
@@ -1892,6 +1900,7 @@ public final class QuorumController implements Controller {
             setReplicaPlacer(replicaPlacer).
             setControllerMetrics(controllerMetrics).
             setFeatureControlManager(featureControl).
+            setZkMigrationEnabled(zkMigrationEnabled).
             build();
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
         this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index bf895fb8a11..6aa40195070 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -57,7 +57,9 @@ public class BrokerRegistration {
     private final Optional<String> rack;
     private final boolean fenced;
     private final boolean inControlledShutdown;
+    private final boolean isMigratingZkBroker;
 
+    // Visible for testing
     public BrokerRegistration(int id,
                               long epoch,
                               Uuid incarnationId,
@@ -67,9 +69,23 @@ public class BrokerRegistration {
                               boolean fenced,
                               boolean inControlledShutdown) {
         this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
-            fenced, inControlledShutdown);
+            fenced, inControlledShutdown, false);
     }
 
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              List<Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced,
+                              boolean inControlledShutdown,
+                              boolean isMigratingZkBroker) {
+        this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
+            fenced, inControlledShutdown, isMigratingZkBroker);
+    }
+
+    // Visible for testing
     public BrokerRegistration(int id,
                               long epoch,
                               Uuid incarnationId,
@@ -78,6 +94,18 @@ public class BrokerRegistration {
                               Optional<String> rack,
                               boolean fenced,
                               boolean inControlledShutdown) {
+        this(id, epoch, incarnationId, listeners, supportedFeatures, rack, fenced, inControlledShutdown, false);
+    }
+
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              Map<String, Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced,
+                              boolean inControlledShutdown,
+                              boolean isMigratingZkBroker) {
         this.id = id;
         this.epoch = epoch;
         this.incarnationId = incarnationId;
@@ -95,6 +123,7 @@ public class BrokerRegistration {
         this.rack = rack;
         this.fenced = fenced;
         this.inControlledShutdown = inControlledShutdown;
+        this.isMigratingZkBroker = isMigratingZkBroker;
     }
 
     public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@@ -117,7 +146,8 @@ public class BrokerRegistration {
             supportedFeatures,
             Optional.ofNullable(record.rack()),
             record.fenced(),
-            record.inControlledShutdown());
+            record.inControlledShutdown(),
+            record.isMigratingZkBroker());
     }
 
     public int id() {
@@ -160,6 +190,10 @@ public class BrokerRegistration {
         return inControlledShutdown;
     }
 
+    public boolean isMigratingZkBroker() {
+        return isMigratingZkBroker;
+    }
+
     public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
         RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
             setBrokerId(id).
@@ -176,6 +210,14 @@ public class BrokerRegistration {
             }
         }
 
+        if (isMigratingZkBroker) {
+            if (options.metadataVersion().isMigrationSupported()) {
+                registrationRecord.setIsMigratingZkBroker(isMigratingZkBroker);
+            } else {
+                options.handleLoss("the isMigratingZkBroker state of one or more brokers");
+            }
+        }
+
         for (Entry<String, Endpoint> entry : listeners.entrySet()) {
             Endpoint endpoint = entry.getValue();
             registrationRecord.endPoints().add(new BrokerEndpoint().
@@ -199,7 +241,7 @@ public class BrokerRegistration {
     @Override
     public int hashCode() {
         return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
-            rack, fenced, inControlledShutdown);
+            rack, fenced, inControlledShutdown, isMigratingZkBroker);
     }
 
     @Override
@@ -213,7 +255,8 @@ public class BrokerRegistration {
             other.supportedFeatures.equals(supportedFeatures) &&
             other.rack.equals(rack) &&
             other.fenced == fenced &&
-            other.inControlledShutdown == inControlledShutdown;
+            other.inControlledShutdown == inControlledShutdown &&
+            other.isMigratingZkBroker == isMigratingZkBroker;
     }
 
     @Override
@@ -234,6 +277,7 @@ public class BrokerRegistration {
         bld.append(", rack=").append(rack);
         bld.append(", fenced=").append(fenced);
         bld.append(", inControlledShutdown=").append(inControlledShutdown);
+        bld.append(", isMigratingZkBroker=").append(isMigratingZkBroker);
         bld.append(")");
         return bld.toString();
     }
@@ -256,7 +300,8 @@ public class BrokerRegistration {
             supportedFeatures,
             rack,
             newFenced,
-            newInControlledShutdownChange
+            newInControlledShutdownChange,
+            isMigratingZkBroker
         );
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 4844b765007..afdb15e7272 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -80,13 +80,13 @@ public class BrokerRegistrationTest {
             "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
-            "rack=Optional.empty, fenced=true, inControlledShutdown=false)",
+            "rack=Optional.empty, fenced=true, inControlledShutdown=false, isMigratingZkBroker=false)",
             REGISTRATIONS.get(1).toString());
         assertEquals("BrokerRegistration(id=2, epoch=0, " +
             "incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 2-3}, " +
-            "rack=Optional[myrack], fenced=false, inControlledShutdown=true)",
+            "rack=Optional[myrack], fenced=false, inControlledShutdown=true, isMigratingZkBroker=false)",
             REGISTRATIONS.get(2).toString());
     }
 
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index ffe7f1e316f..9fb7c0ff4ea 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -265,8 +265,15 @@ public enum MetadataVersion {
         return this.isAtLeast(IBP_3_3_IV3);
     }
 
+    public boolean isMigrationSupported() {
+        return this.isAtLeast(MetadataVersion.IBP_3_4_IV0);
+    }
+
     public short registerBrokerRecordVersion() {
-        if (isInControlledShutdownStateSupported()) {
+        if (isMigrationSupported()) {
+            // new isMigrationZkBroker field
+            return (short) 2;
+        } else if (isInControlledShutdownStateSupported()) {
             return (short) 1;
         } else {
             return (short) 0;
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 51933c5ea01..6c76a82bfa6 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -332,8 +332,14 @@ class MetadataVersionTest {
     @ParameterizedTest
     @EnumSource(value = MetadataVersion.class)
     public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) {
-        short expectedVersion = metadataVersion.isAtLeast(IBP_3_3_IV3) ?
-            (short) 1 : (short) 0;
+        final short expectedVersion;
+        if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_4_IV0)) {
+            expectedVersion = 2;
+        } else if (metadataVersion.isAtLeast(IBP_3_3_IV3)) {
+            expectedVersion = 1;
+        } else {
+            expectedVersion = 0;
+        }
         assertEquals(expectedVersion, metadataVersion.registerBrokerRecordVersion());
     }
 }