You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/04/26 14:20:58 UTC

[kafka] branch trunk updated: KAFKA-14805 KRaft controller supports pre-migration mode (#13407)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c1b5c75d927 KAFKA-14805 KRaft controller supports pre-migration mode (#13407)
c1b5c75d927 is described below

commit c1b5c75d9271638776392822a094e9e7ef37f490
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Apr 26 10:20:30 2023 -0400

    KAFKA-14805 KRaft controller supports pre-migration mode (#13407)
    
    This patch adds the concept of pre-migration mode to the KRaft controller. While in this mode,
    the controller will only allow certain write operations. The purpose of this is to disallow metadata
    changes when the controller is waiting for the ZK migration records to be committed.
    
    The following ControllerWriteEvent operations are permitted in pre-migration mode
    
    * completeActivation
    * maybeFenceReplicas
    * writeNoOpRecord
    * processBrokerHeartbeat
    * registerBroker (only for migrating ZK brokers)
    * unregisterBroker
    
    Raft events and other controller events do not follow the same code path as ControllerWriteEvent,
    so they are not affected by this new behavior.
    
    This patch also add a new metric as defined in KIP-868: kafka.controller:type=KafkaController,name=ZkMigrationState
    
    In order to support upgrades from 3.4.0, this patch also redefines the enum value of value 1 to mean
    MIGRATION rather than PRE_MIGRATION.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Colin P. McCabe <cm...@apache.org>
---
 checkstyle/suppressions.xml                        |   4 +-
 .../test/junit/RaftClusterInvocationContext.java   |  15 +-
 .../kafka/zk/ZkMigrationIntegrationTest.scala      |   8 +-
 .../server/BrokerRegistrationRequestTest.scala     |  89 +++++--
 .../kafka/controller/ClusterControlManager.java    |   5 +
 .../kafka/controller/FeatureControlManager.java    |  44 +++-
 .../kafka/controller/MigrationControlManager.java  |  38 ---
 .../kafka/controller/ProducerIdControlManager.java |   2 +-
 .../apache/kafka/controller/QuorumController.java  | 280 ++++++++++++++++-----
 .../controller/ReplicationControlManager.java      |  13 +-
 .../metrics/ControllerMetadataMetrics.java         |  20 +-
 .../ControllerMetadataMetricsPublisher.java        |   4 +
 .../java/org/apache/kafka/image/FeaturesDelta.java |  22 +-
 .../java/org/apache/kafka/image/FeaturesImage.java |  40 ++-
 .../java/org/apache/kafka/image/MetadataDelta.java |   7 +-
 .../metadata/migration/KRaftMigrationDriver.java   | 182 +++++++++-----
 .../kafka/metadata/migration/ZkMigrationState.java |  30 ++-
 .../kafka/metadata/migration/ZkRecordConsumer.java |   2 +-
 .../common/metadata/ZkMigrationRecord.json         |   7 +-
 .../controller/FeatureControlManagerTest.java      |  59 ++++-
 .../kafka/controller/QuorumControllerTest.java     | 189 +++++++++++++-
 .../metrics/ControllerMetadataMetricsTest.java     |   3 +-
 .../org/apache/kafka/image/FeaturesImageTest.java  |  21 +-
 .../image/publisher/SnapshotGeneratorTest.java     |   2 +-
 .../org/apache/kafka/metadata/RecordTestUtils.java |   8 +
 .../migration/KRaftMigrationDriverTest.java        |  63 ++++-
 tests/docker/Dockerfile                            |   2 +
 .../tests/core/zookeeper_migration_test.py         | 141 ++++++++++-
 28 files changed, 1049 insertions(+), 251 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index af271ee77bd..67f16f7d766 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -306,9 +306,9 @@
     <suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
               files="(QuorumController).java"/>
     <suppress checks="CyclomaticComplexity"
-              files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager).java"/>
+              files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager).java"/>
     <suppress checks="NPathComplexity"
-              files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager).java"/>
+              files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager|ClusterControlManager).java"/>
     <suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
             files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="BooleanExpressionComplexity"
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index b053eaef251..82ec5664259 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -105,19 +105,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
                     zkReference.set(new EmbeddedZookeeper());
                     builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", zkReference.get().port()));
                 }
-
                 // Copy properties into the TestKit builder
                 clusterConfig.serverProperties().forEach((key, value) -> builder.setConfigProp(key.toString(), value.toString()));
                 // KAFKA-12512 need to pass security protocol and listener name here
                 KafkaClusterTestKit cluster = builder.build();
                 clusterReference.set(cluster);
                 cluster.format();
-                cluster.startup();
-                kafka.utils.TestUtils.waitUntilTrue(
-                    () -> cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
-                    () -> "Broker never made it to RUNNING state.",
-                    org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
-                    100L);
+                if (clusterConfig.isAutoStart()) {
+                    cluster.startup();
+                    kafka.utils.TestUtils.waitUntilTrue(
+                        () -> cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                        () -> "Broker never made it to RUNNING state.",
+                        org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+                        100L);
+                }
             },
             (AfterTestExecutionCallback) context -> clusterInstance.stop(),
             new ClusterInstanceParameterResolver(clusterInstance),
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 56ae854eeb0..33eefe75752 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
-import org.junit.jupiter.api.{Disabled, Timeout}
+import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.api.extension.ExtendWith
 import org.slf4j.LoggerFactory
 
@@ -120,6 +120,11 @@ class ZkMigrationIntegrationTest {
     }
   }
 
+  /**
+   * Test ZkMigrationClient against a real ZooKeeper-backed Kafka cluster. This test creates a ZK cluster
+   * and modifies data using AdminClient. The ZkMigrationClient is then used to read the metadata from ZK
+   * as would happen during a migration. The generated records are then verified.
+   */
   @ClusterTest(brokers = 3, clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_4_IV0)
   def testMigrate(clusterInstance: ClusterInstance): Unit = {
     val admin = clusterInstance.createAdminClient()
@@ -182,7 +187,6 @@ class ZkMigrationIntegrationTest {
     migrationState = migrationClient.releaseControllerLeadership(migrationState)
   }
 
-  @Disabled("Will be fixed by KAFKA-14840")
   @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 25594c2de13..5d46687b58e 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -15,23 +15,24 @@
  * limitations under the License.
  */
 
-package unit.kafka.server
+package kafka.server
 
-import kafka.server.{BrokerToControllerChannelManager, ControllerInformation, ControllerNodeProvider, ControllerRequestCompletionHandler}
 import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.annotation._
 import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
 import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.message.{BrokerRegistrationRequestData, BrokerRegistrationResponseData}
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.message.{BrokerRegistrationRequestData, CreateTopicsRequestData}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{BrokerRegistrationRequest, BrokerRegistrationResponse}
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, Uuid}
 import org.apache.kafka.server.common.MetadataVersion
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{Tag, Timeout}
 
@@ -74,18 +75,19 @@ class BrokerRegistrationRequestTest {
     )
   }
 
-  def sendAndRecieve(
+  def sendAndReceive[T <: AbstractRequest, R <: AbstractResponse](
     channelManager: BrokerToControllerChannelManager,
-    req: BrokerRegistrationRequestData
-  ): BrokerRegistrationResponseData = {
-    val responseFuture = new CompletableFuture[BrokerRegistrationResponseData]()
-    channelManager.sendRequest(new BrokerRegistrationRequest.Builder(req), new ControllerRequestCompletionHandler() {
+    reqBuilder: AbstractRequest.Builder[T],
+    timeoutMs: Int
+  ): R = {
+    val responseFuture = new CompletableFuture[R]()
+    channelManager.sendRequest(reqBuilder, new ControllerRequestCompletionHandler() {
       override def onTimeout(): Unit = responseFuture.completeExceptionally(new TimeoutException())
 
       override def onComplete(response: ClientResponse): Unit =
-        responseFuture.complete(response.responseBody().asInstanceOf[BrokerRegistrationResponse].data())
+        responseFuture.complete(response.responseBody().asInstanceOf[R])
     })
-    responseFuture.get(30, TimeUnit.SECONDS)
+    responseFuture.get(timeoutMs, TimeUnit.MILLISECONDS)
   }
 
   def registerBroker(
@@ -113,7 +115,22 @@ class BrokerRegistrationRequestTest {
       .setIsMigratingZkBroker(zkEpoch.isDefined)
       .setFeatures(features)
 
-    Errors.forCode(sendAndRecieve(channelManager, req).errorCode())
+    val resp = sendAndReceive[BrokerRegistrationRequest, BrokerRegistrationResponse](
+      channelManager, new BrokerRegistrationRequest.Builder(req), 30000)
+    Errors.forCode(resp.data().errorCode())
+  }
+
+
+  def createTopics(channelManager: BrokerToControllerChannelManager,
+                   topicName: String): Errors = {
+    val createTopics = new CreateTopicsRequestData()
+    createTopics.setTopics(new CreateTopicsRequestData.CreatableTopicCollection())
+    createTopics.topics().add(new CreatableTopic().setName(topicName).setNumPartitions(10).setReplicationFactor(1))
+    createTopics.setTimeoutMs(500)
+
+    val req = new CreateTopicsRequest.Builder(createTopics)
+    val resp = sendAndReceive[CreateTopicsRequest, CreateTopicsResponse](channelManager, req, 3000).data()
+    Errors.forCode(resp.topics().find(topicName).errorCode())
   }
 
   @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
@@ -145,25 +162,29 @@ class BrokerRegistrationRequestTest {
   }
 
   @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3,
-    serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
-  def testRegisterZkWithKRaftOldMetadataVersion(clusterInstance: ClusterInstance): Unit = {
+    serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
+  def testRegisterZkWith33Controller(clusterInstance: ClusterInstance): Unit = {
+    // Verify that a controller running an old metadata.version cannot register a ZK broker
     val clusterId = clusterInstance.clusterId()
     val channelManager = brokerToControllerChannelManager(clusterInstance)
     try {
       channelManager.start()
-
+      // Invalid registration (isMigratingZkBroker, but MV does not support migrations)
       assertEquals(
         Errors.BROKER_ID_NOT_REGISTERED,
-        registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV0))))
+        registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV0, MetadataVersion.IBP_3_3_IV3))))
 
+      // No features (MV) sent with registration, controller can't verify
       assertEquals(
         Errors.BROKER_ID_NOT_REGISTERED,
         registerBroker(channelManager, clusterId, 100, Some(1), None))
 
+      // Given MV is too high for controller to support
       assertEquals(
         Errors.BROKER_ID_NOT_REGISTERED,
         registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
 
+      // Controller supports this MV and isMigratingZkBroker is false, so this one works
       assertEquals(
         Errors.NONE,
         registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_4_IV0))))
@@ -172,9 +193,16 @@ class BrokerRegistrationRequestTest {
     }
   }
 
-  @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
+  @ClusterTest(
+    clusterType = Type.KRAFT,
+    brokers = 1,
+    controllers = 1,
+    metadataVersion = MetadataVersion.IBP_3_4_IV0,
+    autoStart = AutoStart.NO,
     serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
   def testRegisterZkWithKRaftMigrationEnabled(clusterInstance: ClusterInstance): Unit = {
+    clusterInstance.asInstanceOf[RaftClusterInstance].controllers().forEach(_.startup())
+
     val clusterId = clusterInstance.clusterId()
     val channelManager = brokerToControllerChannelManager(clusterInstance)
     try {
@@ -192,11 +220,32 @@ class BrokerRegistrationRequestTest {
         Errors.UNSUPPORTED_VERSION,
         registerBroker(channelManager, clusterId, 100, Some(1), Some((MetadataVersion.IBP_3_3_IV3, MetadataVersion.IBP_3_3_IV3))))
 
+      // Cannot register KRaft broker when in pre-migration
       assertEquals(
-        Errors.NONE,
+        Errors.BROKER_ID_NOT_REGISTERED,
         registerBroker(channelManager, clusterId, 100, None, Some((MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_4_IV0))))
     } finally {
       channelManager.shutdown()
     }
   }
+
+  /**
+   * Start a KRaft cluster with migrations enabled, verify that the controller does not accept metadata changes
+   * through the RPCs. The migration never proceeds past pre-migration since no ZK brokers are registered.
+   */
+  @ClusterTests(Array(
+    new ClusterTest(clusterType = Type.KRAFT, autoStart = AutoStart.NO, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
+      serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
+  ))
+  def testNoMetadataChangesInPreMigrationMode(clusterInstance: ClusterInstance): Unit = {
+    clusterInstance.asInstanceOf[RaftClusterInstance].controllers().forEach(_.startup())
+
+    val channelManager = brokerToControllerChannelManager(clusterInstance)
+    try {
+      channelManager.start()
+      assertThrows(classOf[TimeoutException], () => createTopics(channelManager, "test-pre-migration"))
+    } finally {
+      channelManager.shutdown()
+    }
+  }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 98981b4c1f6..f0986f1d1a2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -340,6 +340,11 @@ public class ClusterControlManager {
             throw new BrokerIdNotRegisteredException("Controller does not support registering ZK brokers.");
         }
 
+        if (!request.isMigratingZkBroker() && featureControl.inPreMigrationMode()) {
+            throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " +
+                "brokers until the metadata migration is complete.");
+        }
+
         RegisterBrokerRecord record = new RegisterBrokerRecord().
             setBrokerId(brokerId).
             setIsMigratingZkBroker(request.isMigratingZkBroker()).
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 7b2a9308e96..319c9d537ee 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -30,11 +30,13 @@ import java.util.function.Consumer;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -46,6 +48,7 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
 
 
 public class FeatureControlManager {
+
     public static class Builder {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
@@ -85,11 +88,13 @@ public class FeatureControlManager {
                 quorumFeatures = new QuorumFeatures(0, new ApiVersions(), QuorumFeatures.defaultFeatureMap(),
                         Collections.emptyList());
             }
-            return new FeatureControlManager(logContext,
+            return new FeatureControlManager(
+                logContext,
                 quorumFeatures,
                 snapshotRegistry,
                 metadataVersion,
-                minimumBootstrapVersion);
+                minimumBootstrapVersion
+            );
         }
     }
 
@@ -110,6 +115,11 @@ public class FeatureControlManager {
      */
     private final TimelineObject<MetadataVersion> metadataVersion;
 
+    /**
+     * The current ZK migration state
+     */
+    private final TimelineObject<ZkMigrationState> migrationControlState;
+
     /**
      * The minimum bootstrap version that we can't downgrade before.
      */
@@ -127,6 +137,7 @@ public class FeatureControlManager {
         this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
         this.metadataVersion = new TimelineObject<>(snapshotRegistry, metadataVersion);
         this.minimumBootstrapVersion = minimumBootstrapVersion;
+        this.migrationControlState = new TimelineObject<>(snapshotRegistry, ZkMigrationState.NONE);
     }
 
     ControllerResult<Map<String, ApiError>> updateFeatures(
@@ -153,6 +164,10 @@ public class FeatureControlManager {
         return metadataVersion.get();
     }
 
+    ZkMigrationState zkMigrationState() {
+        return migrationControlState.get();
+    }
+
     private ApiError updateFeature(
         String featureName,
         short newVersion,
@@ -232,6 +247,7 @@ public class FeatureControlManager {
         Consumer<ApiMessageAndVersion> recordConsumer
     ) {
         MetadataVersion currentVersion = metadataVersion();
+        ZkMigrationState zkMigrationState = zkMigrationState();
         final MetadataVersion newVersion;
         try {
             newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
@@ -239,6 +255,12 @@ public class FeatureControlManager {
             return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
         }
 
+        // Don't allow metadata.version changes while we're migrating
+        if (zkMigrationState.inProgress()) {
+            return invalidMetadataVersion(newVersionLevel, "Unable to modify metadata.version while a " +
+                "ZK migration is in progress.");
+        }
+
         // We cannot set a version earlier than IBP_3_3_IV0, since that was the first version that contained
         // FeatureLevelRecord itself.
         if (newVersion.isLessThan(minimumBootstrapVersion)) {
@@ -266,6 +288,7 @@ public class FeatureControlManager {
             new FeatureLevelRecord()
                 .setName(MetadataVersion.FEATURE_NAME)
                 .setFeatureLevel(newVersionLevel), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+
         return ApiError.NONE;
     }
 
@@ -284,6 +307,16 @@ public class FeatureControlManager {
         return new FinalizedControllerFeatures(features, epoch);
     }
 
+    /**
+     * Tests if the controller should be preventing metadata updates due to being in the PRE_MIGRATION
+     * state. If the controller does not yet support migrations (before 3.4-IV0), then the migration state
+     * will be NONE and this will return false. Once the controller has been upgraded to a version that supports
+     * migrations, then this method checks if the migration state is equal to PRE_MIGRATION.
+     */
+    boolean inPreMigrationMode() {
+        return migrationControlState.get().equals(ZkMigrationState.PRE_MIGRATION);
+    }
+
     public void replay(FeatureLevelRecord record) {
         VersionRange range = quorumFeatures.localSupportedFeature(record.name());
         if (!range.contains(record.featureLevel())) {
@@ -304,6 +337,13 @@ public class FeatureControlManager {
         }
     }
 
+    public void replay(ZkMigrationStateRecord record) {
+        ZkMigrationState recordState = ZkMigrationState.of(record.zkMigrationState());
+        ZkMigrationState currentState = migrationControlState.get();
+        log.info("Transitioning ZK migration state from {} to {}", currentState, recordState);
+        migrationControlState.set(recordState);
+    }
+
     boolean isControllerId(int nodeId) {
         return quorumFeatures.isControllerId(nodeId);
     }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java
deleted file mode 100644
index 7f477785129..00000000000
--- a/metadata/src/main/java/org/apache/kafka/controller/MigrationControlManager.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.controller;
-
-import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
-import org.apache.kafka.metadata.migration.ZkMigrationState;
-import org.apache.kafka.timeline.SnapshotRegistry;
-import org.apache.kafka.timeline.TimelineObject;
-
-public class MigrationControlManager {
-    private final TimelineObject<ZkMigrationState> zkMigrationState;
-
-    MigrationControlManager(SnapshotRegistry snapshotRegistry) {
-        zkMigrationState = new TimelineObject<>(snapshotRegistry, ZkMigrationState.NONE);
-    }
-
-    ZkMigrationState zkMigrationState() {
-        return zkMigrationState.get();
-    }
-
-    void replay(ZkMigrationStateRecord record) {
-        zkMigrationState.set(ZkMigrationState.of(record.zkMigrationState()));
-    }
-}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
index 2c23bba9dd7..932af0c2e75 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -69,7 +69,7 @@ public class ProducerIdControlManager {
         ProducerIdsBlock nextBlock = nextProducerBlock.get();
         if (nextBlock != ProducerIdsBlock.EMPTY && record.nextProducerId() <= nextBlock.firstProducerId()) {
             throw new RuntimeException("Next Producer ID from replayed record (" + record.nextProducerId() + ")" +
-                " is not greater than current next Producer ID (" + nextBlock.firstProducerId() + ")");
+                " is not greater than current next Producer ID in block (" + nextBlock + ")");
         } else {
             nextProducerBlock.set(new ProducerIdsBlock(record.brokerId(), record.nextProducerId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE));
             brokerEpoch.set(record.brokerEpoch());
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index cc048488964..7e5499fb076 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -113,6 +113,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
@@ -133,6 +134,8 @@ import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME;
+import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.RUNS_IN_PREMIGRATION;
 
 
 /**
@@ -414,7 +417,16 @@ public final class QuorumController implements Controller {
         OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
         if (latestController.isPresent()) {
             return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX +
-                latestController.getAsInt());
+                latestController.getAsInt() + ".");
+        } else {
+            return new NotControllerException("No controller appears to be active.");
+        }
+    }
+
+    private NotControllerException newPreMigrationException() {
+        OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
+        if (latestController.isPresent()) {
+            return new NotControllerException("The controller is in pre-migration mode.");
         } else {
             return new NotControllerException("No controller appears to be active.");
         }
@@ -593,6 +605,26 @@ public final class QuorumController implements Controller {
         return event.future();
     }
 
+    enum ControllerOperationFlag {
+        /**
+         * A flag that signifies that this operation should not update the event queue time metric.
+         * We use this when the event was not appended to the queue.
+         */
+        DOES_NOT_UPDATE_QUEUE_TIME,
+
+        /**
+         * A flag that signifies that this operation can be processed when in pre-migration mode.
+         * Operations without this flag will always return NOT_CONTROLLER when invoked in premigration
+         * mode.
+         * <p>
+         * In pre-migration mode, we are still waiting to load the metadata from Apache
+         * ZooKeeper into the metadata log. Therefore, the metadata log is mostly empty,
+         * even though the cluster really does have metadata. Very few operations should
+         * use this flag.
+         */
+        RUNS_IN_PREMIGRATION
+    }
+
     interface ControllerWriteOperation<T> {
         /**
          * Generate the metadata records needed to implement this controller write
@@ -630,19 +662,19 @@ public final class QuorumController implements Controller {
         private final CompletableFuture<T> future;
         private final ControllerWriteOperation<T> op;
         private final long eventCreatedTimeNs = time.nanoseconds();
-        private final boolean deferred;
+        private final EnumSet<ControllerOperationFlag> flags;
         private OptionalLong startProcessingTimeNs = OptionalLong.empty();
         private ControllerResultAndOffset<T> resultAndOffset;
 
-        ControllerWriteEvent(String name, ControllerWriteOperation<T> op) {
-            this(name, op, false);
-        }
-
-        ControllerWriteEvent(String name, ControllerWriteOperation<T> op, boolean deferred) {
+        ControllerWriteEvent(
+            String name,
+            ControllerWriteOperation<T> op,
+            EnumSet<ControllerOperationFlag> flags
+        ) {
             this.name = name;
             this.future = new CompletableFuture<T>();
             this.op = op;
-            this.deferred = deferred;
+            this.flags = flags;
             this.resultAndOffset = null;
         }
 
@@ -653,15 +685,19 @@ public final class QuorumController implements Controller {
         @Override
         public void run() throws Exception {
             long now = time.nanoseconds();
-            if (!deferred) {
+            if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
                 // We exclude deferred events from the event queue time metric to prevent
                 // incorrectly including the deferral time in the queue time.
                 controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
             }
             int controllerEpoch = curClaimEpoch;
-            if (!isActiveController()) {
+            if (!isActiveController(controllerEpoch)) {
                 throw newNotControllerException();
             }
+            if (featureControl.inPreMigrationMode() && !flags.contains(RUNS_IN_PREMIGRATION)) {
+                log.info("Cannot run write operation {} in pre-migration mode. Returning NOT_CONTROLLER.", name);
+                throw newPreMigrationException();
+            }
             startProcessingTimeNs = OptionalLong.of(now);
             ControllerResult<T> result = op.generateRecordsAndResult();
             if (result.records().isEmpty()) {
@@ -820,10 +856,21 @@ public final class QuorumController implements Controller {
         }
     }
 
-    <T> CompletableFuture<T> appendWriteEvent(String name,
-                                              OptionalLong deadlineNs,
-                                              ControllerWriteOperation<T> op) {
-        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
+    <T> CompletableFuture<T> appendWriteEvent(
+        String name,
+        OptionalLong deadlineNs,
+        ControllerWriteOperation<T> op
+    ) {
+        return appendWriteEvent(name, deadlineNs, op, EnumSet.noneOf(ControllerOperationFlag.class));
+    }
+
+    <T> CompletableFuture<T> appendWriteEvent(
+        String name,
+        OptionalLong deadlineNs,
+        ControllerWriteOperation<T> op,
+        EnumSet<ControllerOperationFlag> flags
+    ) {
+        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, flags);
         if (deadlineNs.isPresent()) {
             queue.appendWithDeadline(deadlineNs.getAsLong(), event);
         } else {
@@ -852,15 +899,8 @@ public final class QuorumController implements Controller {
         }
         @Override
         public void beginMigration() {
+            log.info("Starting ZK Migration");
             // TODO use KIP-868 transaction
-            ControllerWriteEvent<Void> event = new ControllerWriteEvent<>("Begin ZK Migration",
-                new MigrationWriteOperation(
-                    Collections.singletonList(
-                        new ApiMessageAndVersion(
-                            new ZkMigrationStateRecord().setZkMigrationState(ZkMigrationState.PRE_MIGRATION.value()),
-                            ZkMigrationStateRecord.LOWEST_SUPPORTED_VERSION)
-                    )));
-            queue.append(event);
         }
 
         @Override
@@ -871,19 +911,26 @@ public final class QuorumController implements Controller {
                 return future;
             }
             ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<>("ZK Migration Batch",
-                new MigrationWriteOperation(recordBatch));
+                new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION));
             queue.append(batchEvent);
             return batchEvent.future;
         }
 
         @Override
-        public OffsetAndEpoch completeMigration() {
-            // TODO write migration record, use KIP-868 transaction
-            return highestMigrationRecordOffset;
+        public CompletableFuture<OffsetAndEpoch> completeMigration() {
+            log.info("Completing ZK Migration");
+            // TODO use KIP-868 transaction
+            ControllerWriteEvent<Void> event = new ControllerWriteEvent<>("Complete ZK Migration",
+                new MigrationWriteOperation(
+                    Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
+                EnumSet.of(RUNS_IN_PREMIGRATION));
+            queue.append(event);
+            return event.future.thenApply(__ -> highestMigrationRecordOffset);
         }
 
         @Override
         public void abortMigration() {
+            fatalFaultHandler.handleFault("Aborting the ZK migration");
             // TODO use KIP-868 transaction
         }
     }
@@ -1087,7 +1134,11 @@ public final class QuorumController implements Controller {
     }
 
     private boolean isActiveController() {
-        return curClaimEpoch != -1;
+        return isActiveController(curClaimEpoch);
+    }
+
+    private static boolean isActiveController(int claimEpoch) {
+        return claimEpoch != -1;
     }
 
     private void updateWriteOffset(long offset) {
@@ -1125,29 +1176,110 @@ public final class QuorumController implements Controller {
             // Prepend the activate event. It is important that this event go at the beginning
             // of the queue rather than the end (hence prepend rather than append). It's also
             // important not to use prepend for anything else, to preserve the ordering here.
-            queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]",
-                    new CompleteActivationEvent()));
+            ControllerWriteEvent<Void> activationEvent = new ControllerWriteEvent<>(
+                "completeActivation[" + epoch + "]",
+                new CompleteActivationEvent(),
+                EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)
+            );
+            activationEvent.future.exceptionally(t -> {
+                fatalFaultHandler.handleFault("exception while activating controller", t);
+                return null;
+            });
+            queue.prepend(activationEvent);
         } catch (Throwable e) {
             fatalFaultHandler.handleFault("exception while claiming leadership", e);
         }
     }
 
-    class CompleteActivationEvent implements ControllerWriteOperation<Void> {
-        @Override
-        public ControllerResult<Void> generateRecordsAndResult() throws Exception {
-            List<ApiMessageAndVersion> records = new ArrayList<>();
-            if (logReplayTracker.empty()) {
-                // If no records have been replayed, we need to write out the bootstrap records.
-                // This will include the new metadata.version, as well as things like SCRAM
-                // initialization, etc.
-                log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " +
-                        "at metadata.version {} from {}.", bootstrapMetadata.records().size(),
-                        bootstrapMetadata.metadataVersion(), bootstrapMetadata.source());
-                records.addAll(bootstrapMetadata.records());
-            } else if (featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) {
+    /**
+     * Generate the set of activation records. Until KIP-868 transactions are supported, these records
+     * are committed to the log as an atomic batch. The records will include the bootstrap metadata records
+     * (including the bootstrap "metadata.version") and may include a ZK migration record.
+     */
+    public static List<ApiMessageAndVersion> generateActivationRecords(
+        Logger log,
+        boolean isLogEmpty,
+        boolean zkMigrationEnabled,
+        BootstrapMetadata bootstrapMetadata,
+        FeatureControlManager featureControl
+    ) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        if (isLogEmpty) {
+            // If no records have been replayed, we need to write out the bootstrap records.
+            // This will include the new metadata.version, as well as things like SCRAM
+            // initialization, etc.
+            log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " +
+                "at metadata.version {} from {}.", bootstrapMetadata.records().size(),
+                bootstrapMetadata.metadataVersion(), bootstrapMetadata.source());
+            records.addAll(bootstrapMetadata.records());
+
+            if (bootstrapMetadata.metadataVersion().isMigrationSupported()) {
+                if (zkMigrationEnabled) {
+                    log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed until " +
+                        "the ZK metadata has been migrated");
+                    records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
+                } else {
+                    log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.");
+                    records.add(ZkMigrationState.NONE.toRecord());
+                }
+            } else {
+                if (zkMigrationEnabled) {
+                    throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() +
+                        " does not support ZK migrations. Cannot continue with ZK migrations enabled.");
+                }
+            }
+        } else {
+            // Logs have been replayed. We need to initialize some things here if upgrading from older KRaft versions
+            if (featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) {
                 log.info("No metadata.version feature level record was found in the log. " +
-                        "Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION);
+                    "Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION);
+            }
+
+            if (featureControl.metadataVersion().isMigrationSupported()) {
+                log.info("Loaded ZK migration state of {}", featureControl.zkMigrationState());
+                switch (featureControl.zkMigrationState()) {
+                    case NONE:
+                        // Since this is the default state there may or may not be an actual NONE in the log. Regardless,
+                        // it will eventually be persisted in a snapshot, so we don't need to explicitly write it here.
+                        if (zkMigrationEnabled) {
+                            throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was created in KRaft mode.");
+                        }
+                        break;
+                    case PRE_MIGRATION:
+                        log.warn("Activating pre-migration controller without empty log. There may be a partial migration");
+                        break;
+                    case MIGRATION:
+                        if (!zkMigrationEnabled) {
+                            // This can happen if controller leadership transfers to a controller with migrations enabled
+                            // after another controller had finalized the migration. For example, during a rolling restart
+                            // of the controller quorum during which the migration config is being set to false.
+                            log.warn("Completing the ZK migration since this controller was configured with " +
+                                "'zookeeper.metadata.migration.enable' set to 'false'.");
+                            records.add(ZkMigrationState.POST_MIGRATION.toRecord());
+                        } else {
+                            log.info("Staying in the ZK migration since 'zookeeper.metadata.migration.enable' is still 'true'.");
+                        }
+                        break;
+                    case POST_MIGRATION:
+                        if (zkMigrationEnabled) {
+                            log.info("Ignoring 'zookeeper.metadata.migration.enable' value of 'true' since the ZK migration" +
+                                "has been completed.");
+                        }
+                        break;
+                }
+            } else {
+                if (zkMigrationEnabled) {
+                    throw new RuntimeException("Should not have ZK migrations enabled on a cluster running metadata.version " + featureControl.metadataVersion());
+                }
             }
+        }
+        return records;
+    }
+    class CompleteActivationEvent implements ControllerWriteOperation<Void> {
+        @Override
+        public ControllerResult<Void> generateRecordsAndResult() {
+            List<ApiMessageAndVersion> records = generateActivationRecords(log, logReplayTracker.empty(),
+                zkMigrationEnabled, bootstrapMetadata, featureControl);
             return ControllerResult.atomicOf(records, null);
         }
 
@@ -1205,9 +1337,16 @@ public final class QuorumController implements Controller {
         }
     }
 
-    private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs,
-                                                ControllerWriteOperation<T> op) {
-        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, true);
+    private <T> void scheduleDeferredWriteEvent(
+        String name,
+        long deadlineNs,
+        ControllerWriteOperation<T> op,
+        EnumSet<ControllerOperationFlag> flags
+    ) {
+        if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
+            throw new RuntimeException("deferred events should not update the queue time.");
+        }
+        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, flags);
         queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event);
         event.future.exceptionally(e -> {
             if (e instanceof UnknownServerException && e.getCause() != null &&
@@ -1223,7 +1362,7 @@ public final class QuorumController implements Controller {
             log.error("Unexpected exception while executing deferred write event {}. " +
                 "Rescheduling for a minute from now.", name, e);
             scheduleDeferredWriteEvent(name,
-                deadlineNs + NANOSECONDS.convert(1, TimeUnit.MINUTES), op);
+                deadlineNs + NANOSECONDS.convert(1, TimeUnit.MINUTES), op, flags);
             return null;
         });
     }
@@ -1236,13 +1375,15 @@ public final class QuorumController implements Controller {
             cancelMaybeFenceReplicas();
             return;
         }
-        scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () -> {
-            ControllerResult<Void> result = replicationControl.maybeFenceOneStaleBroker();
-            // This following call ensures that if there are multiple brokers that
-            // are currently stale, then fencing for them is scheduled immediately
-            rescheduleMaybeFenceStaleBrokers();
-            return result;
-        });
+        scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs,
+            () -> {
+                ControllerResult<Void> result = replicationControl.maybeFenceOneStaleBroker();
+                // This following call ensures that if there are multiple brokers that
+                // are currently stale, then fencing for them is scheduled immediately
+                rescheduleMaybeFenceStaleBrokers();
+                return result;
+            },
+            EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION));
     }
 
     private void cancelMaybeFenceReplicas() {
@@ -1280,7 +1421,7 @@ public final class QuorumController implements Controller {
                 // generated by a ControllerWriteEvent have been applied.
 
                 return result;
-            }, true);
+            }, EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
 
             long delayNs = time.nanoseconds();
             if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
@@ -1328,7 +1469,7 @@ public final class QuorumController implements Controller {
                         null
                     );
                 },
-                true
+                EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION)
             );
 
             long delayNs = time.nanoseconds() + maxIdleIntervalNs.getAsLong();
@@ -1422,7 +1563,7 @@ public final class QuorumController implements Controller {
                 // NoOpRecord is an empty record and doesn't need to be replayed
                 break;
             case ZK_MIGRATION_STATE_RECORD:
-                // TODO handle this
+                featureControl.replay((ZkMigrationStateRecord) message);
                 break;
             default:
                 throw new RuntimeException("Unhandled record type " + type);
@@ -1646,11 +1787,14 @@ public final class QuorumController implements Controller {
 
     private final ZkRecordConsumer zkRecordConsumer;
 
+    private final boolean zkMigrationEnabled;
+
     /**
      * The maximum number of records per batch to allow.
      */
     private final int maxRecordsPerBatch;
 
+
     private QuorumController(
         FaultHandler fatalFaultHandler,
         LogContext logContext,
@@ -1751,6 +1895,7 @@ public final class QuorumController implements Controller {
         this.curClaimEpoch = -1;
         this.needToCompleteAuthorizerLoad = authorizer.isPresent();
         this.zkRecordConsumer = new MigrationRecordConsumer();
+        this.zkMigrationEnabled = zkMigrationEnabled;
         updateWriteOffset(-1);
 
         resetToEmptyState();
@@ -1802,7 +1947,7 @@ public final class QuorumController implements Controller {
         int brokerId
     ) {
         return appendWriteEvent("unregisterBroker", context.deadlineNs(),
-            () -> replicationControl.unregisterBroker(brokerId));
+            () -> replicationControl.unregisterBroker(brokerId), EnumSet.of(RUNS_IN_PREMIGRATION));
     }
 
     @Override
@@ -1977,7 +2122,8 @@ public final class QuorumController implements Controller {
                             maybeUpdateControlledShutdownOffset(brokerId, offset);
                     }
                 }
-            });
+            },
+            EnumSet.of(RUNS_IN_PREMIGRATION));
     }
 
     @Override
@@ -1985,13 +2131,15 @@ public final class QuorumController implements Controller {
         ControllerRequestContext context,
         BrokerRegistrationRequestData request
     ) {
-        return appendWriteEvent("registerBroker", context.deadlineNs(), () -> {
-            ControllerResult<BrokerRegistrationReply> result = clusterControl.
-                registerBroker(request, writeOffset + 1, featureControl.
-                    finalizedFeatures(Long.MAX_VALUE));
-            rescheduleMaybeFenceStaleBrokers();
-            return result;
-        });
+        return appendWriteEvent("registerBroker", context.deadlineNs(),
+            () -> {
+                ControllerResult<BrokerRegistrationReply> result = clusterControl.
+                    registerBroker(request, writeOffset + 1, featureControl.
+                        finalizedFeatures(Long.MAX_VALUE));
+                rescheduleMaybeFenceStaleBrokers();
+                return result;
+            },
+            EnumSet.of(RUNS_IN_PREMIGRATION));
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index d627c7b5cd6..2d416d0eea6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.controller;
 
-import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.common.ElectionType;
@@ -92,7 +91,6 @@ import org.apache.kafka.metadata.placement.PlacementSpec;
 import org.apache.kafka.metadata.placement.TopicAssignment;
 import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
@@ -204,19 +202,12 @@ public class ReplicationControlManager {
             if (configurationControl == null) {
                 throw new IllegalStateException("Configuration control must be set before building");
             } else if (clusterControl == null) {
-                throw new IllegalStateException("Cluster controller must be set before building");
+                throw new IllegalStateException("Cluster control must be set before building");
             }
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry();
             if (featureControl == null) {
-                featureControl = new FeatureControlManager.Builder().
-                    setLogContext(logContext).
-                    setSnapshotRegistry(snapshotRegistry).
-                    setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
-                        QuorumFeatures.defaultFeatureMap(),
-                        Collections.singletonList(0))).
-                    setMetadataVersion(MetadataVersion.latest()).
-                    build();
+                throw new IllegalStateException("FeatureControlManager must not be null");
             }
             return new ReplicationControlManager(snapshotRegistry,
                 logContext,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
index 07750130fd9..84f2bc1f9bb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
@@ -49,6 +49,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
         "KafkaController", "PreferredReplicaImbalanceCount");
     private final static MetricName METADATA_ERROR_COUNT = getMetricName(
         "KafkaController", "MetadataErrorCount");
+    private final static MetricName ZK_MIGRATION_STATE = getMetricName(
+        "KafkaController", "ZkMigrationState");
 
     private final Optional<MetricsRegistry> registry;
     private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
@@ -58,6 +60,7 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
     private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
     private final AtomicInteger preferredReplicaImbalanceCount = new AtomicInteger(0);
     private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
+    private final AtomicInteger zkMigrationState = new AtomicInteger(-1);
 
     /**
      * Create a new ControllerMetadataMetrics object.
@@ -108,6 +111,12 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
                 return metadataErrorCount();
             }
         }));
+        registry.ifPresent(r -> r.newGauge(ZK_MIGRATION_STATE, new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return (int) zkMigrationState();
+            }
+        }));
     }
 
     public void setFencedBrokerCount(int brokerCount) {
@@ -190,6 +199,14 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
         return this.metadataErrorCount.get();
     }
 
+    public void setZkMigrationState(byte migrationStateValue) {
+        this.zkMigrationState.set(migrationStateValue);
+    }
+
+    public byte zkMigrationState() {
+        return zkMigrationState.byteValue();
+    }
+
     @Override
     public void close() {
         registry.ifPresent(r -> Arrays.asList(
@@ -199,7 +216,8 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
             GLOBAL_PARTITION_COUNT,
             OFFLINE_PARTITION_COUNT,
             PREFERRED_REPLICA_IMBALANCE_COUNT,
-            METADATA_ERROR_COUNT
+            METADATA_ERROR_COUNT,
+            ZK_MIGRATION_STATE
         ).forEach(r::removeMetric));
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
index 504a156fa76..c72bdd9818f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsPublisher.java
@@ -115,6 +115,9 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
             }
         }
         changes.apply(metrics);
+        if (delta.featuresDelta() != null) {
+            delta.featuresDelta().getZkMigrationStateChange().ifPresent(state -> metrics.setZkMigrationState(state.value()));
+        }
     }
 
     private void publishSnapshot(MetadataImage newImage) {
@@ -147,6 +150,7 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
         metrics.setGlobalPartitionCount(totalPartitions);
         metrics.setOfflinePartitionCount(offlinePartitions);
         metrics.setPreferredReplicaImbalanceCount(partitionsWithoutPreferredLeader);
+        metrics.setZkMigrationState(newImage.features().zkMigrationState().value());
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 7f431c2d061..66e371835fa 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
@@ -36,6 +38,8 @@ public final class FeaturesDelta {
 
     private MetadataVersion metadataVersionChange = null;
 
+    private ZkMigrationState zkMigrationStateChange = null;
+
     public FeaturesDelta(FeaturesImage image) {
         this.image = image;
     }
@@ -44,6 +48,10 @@ public final class FeaturesDelta {
         return changes;
     }
 
+    public Optional<ZkMigrationState> getZkMigrationStateChange() {
+        return Optional.ofNullable(zkMigrationStateChange);
+    }
+
     public Optional<MetadataVersion> metadataVersionChange() {
         return Optional.ofNullable(metadataVersionChange);
     }
@@ -68,6 +76,10 @@ public final class FeaturesDelta {
         }
     }
 
+    public void replay(ZkMigrationStateRecord record) {
+        this.zkMigrationStateChange = ZkMigrationState.of(record.zkMigrationState());
+    }
+
     public FeaturesImage apply() {
         Map<String, Short> newFinalizedVersions =
             new HashMap<>(image.finalizedVersions().size());
@@ -96,7 +108,14 @@ public final class FeaturesDelta {
         } else {
             metadataVersion = metadataVersionChange;
         }
-        return new FeaturesImage(newFinalizedVersions, metadataVersion);
+
+        final ZkMigrationState zkMigrationState;
+        if (zkMigrationStateChange == null) {
+            zkMigrationState = image.zkMigrationState();
+        } else {
+            zkMigrationState = zkMigrationStateChange;
+        }
+        return new FeaturesImage(newFinalizedVersions, metadataVersion, zkMigrationState);
     }
 
     @Override
@@ -104,6 +123,7 @@ public final class FeaturesDelta {
         return "FeaturesDelta(" +
             "changes=" + changes +
             ", metadataVersionChange=" + metadataVersionChange +
+            ", zkMigrationStateChange=" + zkMigrationStateChange +
             ')';
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
index 175bf464784..623b45a8dba 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
@@ -20,6 +20,7 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.ArrayList;
@@ -27,6 +28,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -37,19 +39,32 @@ import java.util.stream.Collectors;
  * This class is thread-safe.
  */
 public final class FeaturesImage {
-    public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.MINIMUM_KRAFT_VERSION);
+    public static final FeaturesImage EMPTY = new FeaturesImage(
+        Collections.emptyMap(),
+        MetadataVersion.MINIMUM_KRAFT_VERSION,
+        ZkMigrationState.NONE
+    );
 
     private final Map<String, Short> finalizedVersions;
 
     private final MetadataVersion metadataVersion;
 
-    public FeaturesImage(Map<String, Short> finalizedVersions, MetadataVersion metadataVersion) {
+    private final ZkMigrationState zkMigrationState;
+
+    public FeaturesImage(
+        Map<String, Short> finalizedVersions,
+        MetadataVersion metadataVersion,
+        ZkMigrationState zkMigrationState
+    ) {
         this.finalizedVersions = Collections.unmodifiableMap(finalizedVersions);
         this.metadataVersion = metadataVersion;
+        this.zkMigrationState = zkMigrationState;
     }
 
     public boolean isEmpty() {
-        return finalizedVersions.isEmpty();
+        return finalizedVersions.isEmpty() &&
+            metadataVersion.equals(MetadataVersion.MINIMUM_KRAFT_VERSION) &&
+            zkMigrationState.equals(ZkMigrationState.NONE);
     }
 
     public MetadataVersion metadataVersion() {
@@ -60,6 +75,10 @@ public final class FeaturesImage {
         return finalizedVersions;
     }
 
+    public ZkMigrationState zkMigrationState() {
+        return zkMigrationState;
+    }
+
     private Optional<Short> finalizedVersion(String feature) {
         return Optional.ofNullable(finalizedVersions.get(feature));
     }
@@ -70,6 +89,14 @@ public final class FeaturesImage {
         } else {
             writeFeatureLevels(writer, options);
         }
+
+        if (options.metadataVersion().isMigrationSupported()) {
+            writer.write(0, zkMigrationState.toRecord().message());
+        } else {
+            if (!zkMigrationState.equals(ZkMigrationState.NONE)) {
+                options.handleLoss("the ZK Migration state which was " + zkMigrationState);
+            }
+        }
     }
 
     private void handleFeatureLevelNotSupported(ImageWriterOptions options) {
@@ -105,14 +132,16 @@ public final class FeaturesImage {
 
     @Override
     public int hashCode() {
-        return finalizedVersions.hashCode();
+        return Objects.hash(finalizedVersions, metadataVersion, zkMigrationState);
     }
 
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof FeaturesImage)) return false;
         FeaturesImage other = (FeaturesImage) o;
-        return finalizedVersions.equals(other.finalizedVersions);
+        return finalizedVersions.equals(other.finalizedVersions) &&
+            metadataVersion.equals(other.metadataVersion) &&
+            zkMigrationState.equals(other.zkMigrationState);
     }
 
 
@@ -121,6 +150,7 @@ public final class FeaturesImage {
         return "FeaturesImage{" +
                 "finalizedVersions=" + finalizedVersions +
                 ", metadataVersion=" + metadataVersion +
+                ", zkMigrationState=" + zkMigrationState +
                 '}';
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 05fc879bfe9..7b47d54e307 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.server.common.MetadataVersion;
 
@@ -226,7 +227,7 @@ public final class MetadataDelta {
                  */
                 break;
             case ZK_MIGRATION_STATE_RECORD:
-                // TODO handle this
+                replay((ZkMigrationStateRecord) record);
                 break;
             default:
                 throw new RuntimeException("Unknown metadata record type " + type);
@@ -312,6 +313,10 @@ public final class MetadataDelta {
         getOrCreateScramDelta().replay(record);
     }
 
+    public void replay(ZkMigrationStateRecord record) {
+        getOrCreateFeaturesDelta().replay(record);
+    }
+
     /**
      * Create removal deltas for anything which was in the base image, but which was not
      * referenced in the snapshot records we just applied.
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 1534e1b9509..879fc095850 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -36,6 +36,8 @@ import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.util.Deadline;
+import org.apache.kafka.server.util.FutureUtils;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -47,8 +49,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -58,13 +60,21 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
- * This class orchestrates and manages the state related to a ZK to KRaft migration. An event thread is used to
+ * This class orchestrates and manages the state related to a ZK to KRaft migration. A single event thread is used to
  * serialize events coming from various threads and listeners.
  */
 public class KRaftMigrationDriver implements MetadataPublisher {
     private final static Consumer<Throwable> NO_OP_HANDLER = ex -> { };
 
+    /**
+     * When waiting for the metadata layer to commit batches, we block the migration driver thread for this
+     * amount of time. A large value is selected to avoid timeouts in the common case, but prevent us from
+     * blocking indefinitely.
+     */
+    private final static int METADATA_COMMIT_MAX_WAIT_MS = 300_000;
+
     private final Time time;
+    private final LogContext logContext;
     private final Logger log;
     private final int nodeId;
     private final MigrationClient zkMigrationClient;
@@ -81,6 +91,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     private volatile MigrationDriverState migrationState;
     private volatile ZkMigrationLeadershipState migrationLeadershipState;
     private volatile MetadataImage image;
+    private volatile boolean firstPublish;
 
     public KRaftMigrationDriver(
         int nodeId,
@@ -95,12 +106,13 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         this.zkMigrationClient = zkMigrationClient;
         this.propagator = propagator;
         this.time = Time.SYSTEM;
-        LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
-        this.log = logContext.logger(KRaftMigrationDriver.class);
+        this.logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
+        this.log = this.logContext.logger(KRaftMigrationDriver.class);
         this.migrationState = MigrationDriverState.UNINITIALIZED;
         this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
         this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, "controller-" + nodeId + "-migration-driver-");
         this.image = MetadataImage.EMPTY;
+        this.firstPublish = false;
         this.leaderAndEpoch = LeaderAndEpoch.UNKNOWN;
         this.initialZkLoadHandler = initialZkLoadHandler;
         this.faultHandler = faultHandler;
@@ -123,19 +135,18 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         return stateFuture;
     }
 
-    private void initializeMigrationState() {
-        log.info("Recovering migration state");
+    private void recoverMigrationStateFromZK() {
+        log.info("Recovering migration state from ZK");
         apply("Recovery", zkMigrationClient::getOrCreateMigrationRecoveryState);
         String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
         log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);
+
+        // Once we've recovered the migration state from ZK, install this class as a metadata published
+        // by calling the initialZkLoadHandler.
         initialZkLoadHandler.accept(this);
-        // Let's transition to INACTIVE state and wait for leadership events.
-        transitionTo(MigrationDriverState.INACTIVE);
-    }
 
-    private boolean isControllerQuorumReadyForMigration() {
-        // TODO implement this
-        return true;
+        // Transition to INACTIVE state and wait for leadership events.
+        transitionTo(MigrationDriverState.INACTIVE);
     }
 
     private boolean imageDoesNotContainAllBrokers(MetadataImage image, Set<Integer> brokerIds) {
@@ -148,14 +159,25 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     }
 
     private boolean areZkBrokersReadyForMigration() {
-        if (image == MetadataImage.EMPTY) {
-            // TODO maybe add WAIT_FOR_INITIAL_METADATA_PUBLISH state to avoid this kind of check?
+        if (!firstPublish) {
             log.info("Waiting for initial metadata publish before checking if Zk brokers are registered.");
             return false;
         }
 
-        // First check the brokers registered in ZK
+        if (image.cluster().isEmpty()) {
+            // This primarily happens in system tests when we are starting a new ZK cluster and KRaft quorum
+            // around the same time.
+            log.info("No brokers are known to KRaft, waiting for brokers to register.");
+            return false;
+        }
+
         Set<Integer> zkBrokerRegistrations = zkMigrationClient.readBrokerIds();
+        if (zkBrokerRegistrations.isEmpty()) {
+            // Similar to the above empty check
+            log.info("No brokers are registered in ZK, waiting for brokers to register.");
+            return false;
+        }
+
         if (imageDoesNotContainAllBrokers(image, zkBrokerRegistrations)) {
             log.info("Still waiting for ZK brokers {} to register with KRaft.", zkBrokerRegistrations);
             return false;
@@ -187,6 +209,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
     private boolean isValidStateChange(MigrationDriverState newState) {
         if (migrationState == newState)
             return true;
+
+        if (newState == MigrationDriverState.UNINITIALIZED) {
+            return false;
+        }
+
         switch (migrationState) {
             case UNINITIALIZED:
             case DUAL_WRITE:
@@ -196,6 +223,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             case WAIT_FOR_CONTROLLER_QUORUM:
                 return
                     newState == MigrationDriverState.INACTIVE ||
+                    newState == MigrationDriverState.BECOME_CONTROLLER ||
                     newState == MigrationDriverState.WAIT_FOR_BROKERS;
             case WAIT_FOR_BROKERS:
                 return
@@ -222,23 +250,16 @@ public class KRaftMigrationDriver implements MetadataPublisher {
 
     private void transitionTo(MigrationDriverState newState) {
         if (!isValidStateChange(newState)) {
-            log.error("Error transition in migration driver from {} to {}", migrationState, newState);
-            return;
+            throw new IllegalStateException(
+                String.format("Invalid transition in migration driver from %s to %s", migrationState, newState));
         }
+
         if (newState != migrationState) {
             log.debug("{} transitioning from {} to {} state", nodeId, migrationState, newState);
         } else {
             log.trace("{} transitioning from {} to {} state", nodeId, migrationState, newState);
         }
-        switch (newState) {
-            case UNINITIALIZED:
-                // No state can transition to UNITIALIZED.
-                throw new IllegalStateException("Illegal transition from " + migrationState + " to " + newState + " " +
-                "state in Zk to KRaft migration");
-            case INACTIVE:
-                // Any state can go to INACTIVE.
-                break;
-        }
+
         migrationState = newState;
     }
 
@@ -319,11 +340,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         public void run() throws Exception {
             switch (migrationState) {
                 case UNINITIALIZED:
-                    initializeMigrationState();
+                    recoverMigrationStateFromZK();
                     break;
                 case INACTIVE:
-                    // Nothing to do when the driver is inactive. We need to wait on the
-                    // controller node's state to move forward.
+                    // Nothing to do when the driver is inactive. We must wait until a KRaftLeaderEvent
+                    // tells informs us that we are the leader.
                     break;
                 case WAIT_FOR_CONTROLLER_QUORUM:
                     eventQueue.append(new WaitForControllerQuorumEvent());
@@ -355,6 +376,11 @@ public class KRaftMigrationDriver implements MetadataPublisher {
         }
     }
 
+    /**
+     * An event generated by a call to {@link MetadataPublisher#onControllerChange}. This will not be called until
+     * this class is registered with {@link org.apache.kafka.image.loader.MetadataLoader}. The registration happens
+     * after the migration state is loaded from ZooKeeper in {@link #recoverMigrationStateFromZK}.
+     */
     class KRaftLeaderEvent extends MigrationEvent {
         private final LeaderAndEpoch leaderAndEpoch;
 
@@ -367,27 +393,21 @@ public class KRaftMigrationDriver implements MetadataPublisher {
             // We can either be the active controller or just resigned from being the controller.
             KRaftMigrationDriver.this.leaderAndEpoch = leaderAndEpoch;
             boolean isActive = leaderAndEpoch.isLeader(KRaftMigrationDriver.this.nodeId);
-            switch (migrationState) {
-                case UNINITIALIZED:
-                    // Poll and retry after initialization
-                    long deadline = time.nanoseconds() + NANOSECONDS.convert(10, SECONDS);
-                    eventQueue.scheduleDeferred(
-                        "poll",
-                        new EventQueue.DeadlineFunction(deadline),
-                        this);
-                    break;
-                default:
-                    if (!isActive) {
-                        apply("KRaftLeaderEvent is not active", state -> ZkMigrationLeadershipState.EMPTY);
-                        transitionTo(MigrationDriverState.INACTIVE);
-                    } else {
-                        // Apply the new KRaft state
-                        apply("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch()));
-                        // Before becoming the controller fo ZkBrokers, we need to make sure the
-                        // Controller Quorum can handle migration.
-                        transitionTo(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM);
-                    }
-                    break;
+
+            if (!isActive) {
+                apply("KRaftLeaderEvent is not active", state ->
+                    state.withNewKRaftController(
+                        leaderAndEpoch.leaderId().orElse(ZkMigrationLeadershipState.EMPTY.kraftControllerId()),
+                        leaderAndEpoch.epoch())
+                );
+                transitionTo(MigrationDriverState.INACTIVE);
+            } else {
+                // Apply the new KRaft state
+                apply("KRaftLeaderEvent is active", state -> state.withNewKRaftController(nodeId, leaderAndEpoch.epoch()));
+
+                // Before becoming the controller fo ZkBrokers, we need to make sure the
+                // Controller Quorum can handle migration.
+                transitionTo(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM);
             }
         }
     }
@@ -396,18 +416,41 @@ public class KRaftMigrationDriver implements MetadataPublisher {
 
         @Override
         public void run() throws Exception {
-            switch (migrationState) {
-                case WAIT_FOR_CONTROLLER_QUORUM:
-                    if (isControllerQuorumReadyForMigration()) {
-                        log.debug("Controller Quorum is ready for Zk to KRaft migration");
-                        // Note that leadership would not change here. Hence we do not need to
-                        // `apply` any leadership state change.
+            if (migrationState.equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM)) {
+                if (!firstPublish) {
+                    log.trace("Waiting until we have received metadata before proceeding with migration");
+                    return;
+                }
+
+                ZkMigrationState zkMigrationState = image.features().zkMigrationState();
+                switch (zkMigrationState) {
+                    case NONE:
+                        // This error message is used in zookeeper_migration_test.py::TestMigration.test_pre_migration_mode_3_4
+                        log.error("The controller's ZkMigrationState is NONE which means this cluster should not be migrated from ZooKeeper. " +
+                            "This controller should not be configured with 'zookeeper.metadata.migration.enable' set to true. " +
+                            "Will not proceed with a migration.");
+                        transitionTo(MigrationDriverState.INACTIVE);
+                        break;
+                    case PRE_MIGRATION:
+                        // Base case when starting the migration
+                        log.debug("Controller Quorum is ready for Zk to KRaft migration. Now waiting for ZK brokers.");
                         transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
-                    }
-                    break;
-                default:
-                    // Ignore the event as we're not trying to become controller anymore.
-                    break;
+                        break;
+                    case MIGRATION:
+                        if (!migrationLeadershipState.zkMigrationComplete()) {
+                            log.error("KRaft controller indicates an active migration, but the ZK state does not.");
+                            transitionTo(MigrationDriverState.INACTIVE);
+                        } else {
+                            // Base case when rebooting a controller during migration
+                            log.debug("Migration is in already progress, not waiting on ZK brokers.");
+                            transitionTo(MigrationDriverState.BECOME_CONTROLLER);
+                        }
+                        break;
+                    case POST_MIGRATION:
+                        log.error("KRaft controller indicates a completed migration, but the migration driver is somehow active.");
+                        transitionTo(MigrationDriverState.INACTIVE);
+                        break;
+                }
             }
         }
     }
@@ -463,15 +506,19 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                             log.info("Migrating {} records from ZK", batch.size());
                         }
                         CompletableFuture<?> future = zkRecordConsumer.acceptBatch(batch);
+                        FutureUtils.waitWithLogging(KRaftMigrationDriver.this.log, KRaftMigrationDriver.this.logContext.logPrefix(),
+                            "the metadata layer to commit migration record batch",
+                            future, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
                         count.addAndGet(batch.size());
-                        future.get();
-                    } catch (InterruptedException e) {
+                    } catch (Throwable e) {
                         throw new RuntimeException(e);
-                    } catch (ExecutionException e) {
-                        throw new RuntimeException(e.getCause());
                     }
                 }, brokersInMetadata::add);
-                OffsetAndEpoch offsetAndEpochAfterMigration = zkRecordConsumer.completeMigration();
+                CompletableFuture<OffsetAndEpoch> completeMigrationFuture = zkRecordConsumer.completeMigration();
+                OffsetAndEpoch offsetAndEpochAfterMigration = FutureUtils.waitWithLogging(
+                    KRaftMigrationDriver.this.log, KRaftMigrationDriver.this.logContext.logPrefix(),
+                    "the metadata layer to complete the migration",
+                    completeMigrationFuture, Deadline.fromDelay(time, METADATA_COMMIT_MAX_WAIT_MS, TimeUnit.MILLISECONDS), time);
                 log.info("Completed migration of metadata from Zookeeper to KRaft. A total of {} metadata records were " +
                          "generated. The current metadata offset is now {} with an epoch of {}. Saw {} brokers in the " +
                          "migrated metadata {}.",
@@ -483,7 +530,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
                 ZkMigrationLeadershipState newState = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
                     offsetAndEpochAfterMigration.offset(),
                     offsetAndEpochAfterMigration.epoch());
-                apply("Migrate metadata from Zk", state -> zkMigrationClient.setMigrationRecoveryState(newState));
+                apply("Finished migrating ZK data", state -> zkMigrationClient.setMigrationRecoveryState(newState));
                 transitionTo(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM);
             } catch (Throwable t) {
                 zkRecordConsumer.abortMigration();
@@ -535,6 +582,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
 
         @Override
         public void run() throws Exception {
+            KRaftMigrationDriver.this.firstPublish = true;
             MetadataImage prevImage = KRaftMigrationDriver.this.image;
             KRaftMigrationDriver.this.image = image;
             String metadataType = isSnapshot ? "snapshot" : "delta";
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
index f8404cfe5d1..8d5e584831c 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java
@@ -16,11 +16,14 @@
  */
 package org.apache.kafka.metadata.migration;
 
+import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
 import java.util.Optional;
 
 /**
  * The cluster-wide ZooKeeper migration state.
- *
+ * </p>
  * An enumeration of the possible states of the ZkMigrationState field in ZkMigrationStateRecord.
  * This information is persisted in the metadata log and image.
  *
@@ -29,7 +32,8 @@ import java.util.Optional;
 public enum ZkMigrationState {
     /**
      * The cluster was created in KRaft mode. A cluster that was created in ZK mode can never attain
-     * this state; the endpoint of migration is POST_MIGRATION, instead.
+     * this state; the endpoint of migration is POST_MIGRATION, instead. This value is also used as
+     * the default migration state in an empty metadata log.
      */
     NONE((byte) 0),
 
@@ -38,15 +42,20 @@ public enum ZkMigrationState {
      * The controller is now awaiting the preconditions for starting the migration to KRaft. In this
      * state, the metadata log does not yet contain the cluster's data. There is a metadata quorum,
      * but it is not doing anything useful yet.
+     * </p>
+     * In Kafka 3.4, PRE_MIGRATION was written out as value 1 to the log, but no MIGRATION state
+     * was ever written. Since this would be an invalid log state in 3.5+, we have swapped the
+     * enum values for PRE_MIGRATION and MIGRATION. This allows us to handle the upgrade case
+     * from 3.4 without adding additional fields to the migration record.
      */
-    PRE_MIGRATION((byte) 1),
+    PRE_MIGRATION((byte) 2),
 
     /**
      * The ZK data has been migrated, and the KRaft controller is now writing metadata to both ZK
-     * and the metadata log. The controller will remain in this state until all of the brokers have
+     * and the metadata log. The controller will remain in this state until all the brokers have
      * been restarted in KRaft mode.
      */
-    MIGRATION((byte) 2),
+    MIGRATION((byte) 1),
 
     /**
      * The migration from ZK has been fully completed. The cluster is running in KRaft mode. This state
@@ -65,6 +74,13 @@ public enum ZkMigrationState {
         return value;
     }
 
+    public ApiMessageAndVersion toRecord() {
+        return new ApiMessageAndVersion(
+            new ZkMigrationStateRecord().setZkMigrationState(value()),
+            (short) 0
+        );
+    }
+
     public static ZkMigrationState of(byte value) {
         return optionalOf(value)
             .orElseThrow(() -> new IllegalArgumentException(String.format("Value %s is not a valid Zk migration state", value)));
@@ -78,4 +94,8 @@ public enum ZkMigrationState {
         }
         return Optional.empty();
     }
+
+    public boolean inProgress() {
+        return this == PRE_MIGRATION || this == MIGRATION;
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java
index 4e35b719d14..00ffdcb4abb 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkRecordConsumer.java
@@ -25,6 +25,6 @@ import java.util.concurrent.CompletableFuture;
 public interface ZkRecordConsumer {
     void beginMigration();
     CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch);
-    OffsetAndEpoch completeMigration();
+    CompletableFuture<OffsetAndEpoch> completeMigration();
     void abortMigration();
 }
diff --git a/metadata/src/main/resources/common/metadata/ZkMigrationRecord.json b/metadata/src/main/resources/common/metadata/ZkMigrationRecord.json
index 65910dc6a45..aaaed4f4a08 100644
--- a/metadata/src/main/resources/common/metadata/ZkMigrationRecord.json
+++ b/metadata/src/main/resources/common/metadata/ZkMigrationRecord.json
@@ -18,7 +18,12 @@
   "type": "metadata",
   "name": "ZkMigrationStateRecord",
   // Version 0 adds ZkMigrationState which is used by the KRaft controller to mark the beginning and end
-  // of the ZK to KRaft migration. Possible values are 1 (PreMigration), 2 (Migration), 3 (PostMigration).
+  // of the ZK to KRaft migration.
+  //
+  // In 3.4, the defined values are: 0 (None), 1 (PreMigration), 2 (Migration), 3 (PostMigration).
+  // In 3.5, the values for PreMigration and Migration were swapped: 0 (None), 2 (PreMigration), 1 (Migration), 3 (PostMigration).
+  //   This was done to work around the fact that we never wrote Migration or PostMigration records in 3.4
+  //
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 8345b4e3149..bd9483ff5d8 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -225,10 +227,10 @@ public class FeatureControlManagerTest {
     }
 
     private static final FeatureControlManager.Builder TEST_MANAGER_BUILDER1 =
-            new FeatureControlManager.Builder().
-                    setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
-                            MetadataVersion.IBP_3_3_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())).
-                    setMetadataVersion(MetadataVersion.IBP_3_3_IV2);
+        new FeatureControlManager.Builder().
+            setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
+                MetadataVersion.IBP_3_3_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())).
+            setMetadataVersion(MetadataVersion.IBP_3_3_IV2);
 
     @Test
     public void testApplyMetadataVersionChangeRecord() {
@@ -355,7 +357,8 @@ public class FeatureControlManagerTest {
                 setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
                         MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV1.featureLevel())).
                 setMetadataVersion(MetadataVersion.IBP_3_1_IV0).
-                setMinimumBootstrapVersion(MetadataVersion.IBP_3_0_IV0).build();
+                setMinimumBootstrapVersion(MetadataVersion.IBP_3_0_IV0).
+                build();
         assertEquals(ControllerResult.of(Collections.emptyList(),
                         singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
                 manager.updateFeatures(
@@ -369,8 +372,9 @@ public class FeatureControlManagerTest {
     public void testCanotDowngradeBefore3_3_IV0() {
         FeatureControlManager manager = new FeatureControlManager.Builder().
             setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
-                    MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())).
-                    setMetadataVersion(MetadataVersion.IBP_3_3_IV0).build();
+                MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())).
+            setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
+            build();
         assertEquals(ControllerResult.of(Collections.emptyList(),
                         singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
                         "Invalid metadata.version 3. Unable to set a metadata.version less than 3.3-IV0"))),
@@ -412,4 +416,45 @@ public class FeatureControlManagerTest {
         RecordTestUtils.replayAll(manager, result2.records());
         assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
     }
+
+    @Test
+    public void testNoMetadataVersionChangeDuringMigration() {
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
+                    MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_5_IV1.featureLevel())).
+            setMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+            build();
+        BootstrapMetadata bootstrapMetadata = BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "FeatureControlManagerTest");
+        RecordTestUtils.replayAll(manager, bootstrapMetadata.records());
+        RecordTestUtils.replayOne(manager, ZkMigrationState.PRE_MIGRATION.toRecord());
+
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+            singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
+                "Invalid metadata.version 10. Unable to modify metadata.version while a ZK migration is in progress."))),
+            manager.updateFeatures(
+                singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
+                singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
+                emptyMap(),
+                true));
+
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+                singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
+                "Invalid metadata.version 4. Unable to modify metadata.version while a ZK migration is in progress."))),
+            manager.updateFeatures(
+                singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
+                singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+                emptyMap(),
+                true));
+
+        // Complete the migration
+        RecordTestUtils.replayOne(manager, ZkMigrationState.POST_MIGRATION.toRecord());
+        ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
+            singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
+            singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
+            emptyMap(),
+            false);
+        assertEquals(Errors.NONE, result.response().get(MetadataVersion.FEATURE_NAME).error());
+        RecordTestUtils.replayAll(manager, result.records());
+        assertEquals(MetadataVersion.IBP_3_5_IV1, manager.metadataVersion());
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 1894a41fcf2..4f3a39b118d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -49,8 +49,10 @@ import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
 import org.apache.kafka.common.requests.AlterPartitionRequest;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.config.ConfigResource;
@@ -92,18 +94,22 @@ import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.metadata.util.BatchFileWriter;
 import org.apache.kafka.metalog.LocalLogManager;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandlerException;
 import org.apache.kafka.snapshot.FileRawSnapshotReader;
 import org.apache.kafka.snapshot.RawSnapshotReader;
 import org.apache.kafka.snapshot.Snapshots;
 import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -244,7 +250,7 @@ public class QuorumControllerTest {
             new ResultOrError<>(Collections.emptyMap())),
             controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
                 BROKER0, Collections.emptyList())).get());
-        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(3L));
+        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(4L));
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get());
     }
 
@@ -556,7 +562,7 @@ public class QuorumControllerTest {
                     setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
                     setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)).
                     setListeners(listeners));
-            assertEquals(2L, reply.get().epoch());
+            assertEquals(3L, reply.get().epoch());
             CreateTopicsRequestData createTopicsRequestData =
                 new CreateTopicsRequestData().setTopics(
                     new CreatableTopicCollection(Collections.singleton(
@@ -572,7 +578,7 @@ public class QuorumControllerTest {
                         get().topics().find("foo").errorMessage());
             assertEquals(new BrokerHeartbeatReply(true, false, false, false),
                 active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                        setWantFence(false).setBrokerEpoch(2L).setBrokerId(0).
+                        setWantFence(false).setBrokerEpoch(3L).setBrokerId(0).
                         setCurrentMetadataOffset(100000L)).get());
             assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
                 createTopicsRequestData, Collections.singleton("foo")).
@@ -1338,4 +1344,181 @@ public class QuorumControllerTest {
                         2,
                         appender)).getMessage());
     }
+
+    @Test
+    public void testBootstrapZkMigrationRecord() throws Exception {
+        assertEquals(ZkMigrationState.PRE_MIGRATION,
+            checkBootstrapZkMigrationRecord(MetadataVersion.IBP_3_4_IV0, true));
+
+        assertEquals(ZkMigrationState.NONE,
+            checkBootstrapZkMigrationRecord(MetadataVersion.IBP_3_4_IV0, false));
+
+        assertEquals(ZkMigrationState.NONE,
+            checkBootstrapZkMigrationRecord(MetadataVersion.IBP_3_3_IV0, false));
+
+        assertEquals(
+            "The bootstrap metadata.version 3.3-IV0 does not support ZK migrations. Cannot continue with ZK migrations enabled.",
+            assertThrows(FaultHandlerException.class, () ->
+                checkBootstrapZkMigrationRecord(MetadataVersion.IBP_3_3_IV0, true)).getCause().getCause().getMessage()
+        );
+    }
+
+    public ZkMigrationState checkBootstrapZkMigrationRecord(
+        MetadataVersion metadataVersion,
+        boolean migrationEnabled
+    ) throws Exception {
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setZkMigrationEnabled(migrationEnabled);
+                }).
+                setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test")).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            return active.appendReadEvent("read migration state", OptionalLong.empty(),
+                () -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testUpgradeMigrationStateFrom34() throws Exception {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build()) {
+            // In 3.4, we only wrote a PRE_MIGRATION to the log. In that software version, we defined this
+            // as enum value 1. In 3.5+ software, this enum value is redefined as MIGRATION
+            BootstrapMetadata bootstrapMetadata = BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_4_IV0, "test");
+            List<ApiMessageAndVersion> initialRecords = new ArrayList<>(bootstrapMetadata.records());
+            initialRecords.add(ZkMigrationState.of((byte) 1).toRecord());
+            logEnv.appendInitialRecords(initialRecords);
+            try (
+                QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                    setControllerBuilderInitializer(controllerBuilder -> {
+                        controllerBuilder.setZkMigrationEnabled(true);
+                    }).
+                    setBootstrapMetadata(bootstrapMetadata).
+                    build();
+            ) {
+                QuorumController active = controlEnv.activeController();
+                assertEquals(active.featureControl().zkMigrationState(), ZkMigrationState.MIGRATION);
+                assertFalse(active.featureControl().inPreMigrationMode());
+            }
+        }
+    }
+
+    FeatureControlManager getActivationRecords(
+            MetadataVersion metadataVersion,
+            Optional<ZkMigrationState> stateInLog,
+            boolean zkMigrationEnabled
+    ) {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControlManager = new FeatureControlManager.Builder()
+                .setSnapshotRegistry(snapshotRegistry)
+                .setMetadataVersion(metadataVersion)
+                .build();
+
+        stateInLog.ifPresent(zkMigrationState ->
+            featureControlManager.replay((ZkMigrationStateRecord) zkMigrationState.toRecord().message()));
+
+        List<ApiMessageAndVersion> records = QuorumController.generateActivationRecords(
+            log,
+            !stateInLog.isPresent(),
+            zkMigrationEnabled,
+            BootstrapMetadata.fromVersion(metadataVersion, "test"),
+            featureControlManager);
+        RecordTestUtils.replayAll(featureControlManager, records);
+        return featureControlManager;
+    }
+
+    @Test
+    public void testActivationRecords33() {
+        FeatureControlManager featureControl;
+
+        assertEquals(
+            "The bootstrap metadata.version 3.3-IV0 does not support ZK migrations. Cannot continue with ZK migrations enabled.",
+            assertThrows(RuntimeException.class, () -> getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.empty(), true)).getMessage()
+        );
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.empty(), false);
+        assertEquals(MetadataVersion.IBP_3_3_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
+
+        assertEquals(
+            "Should not have ZK migrations enabled on a cluster running metadata.version 3.3-IV0",
+            assertThrows(RuntimeException.class, () -> getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.of(ZkMigrationState.NONE), true)).getMessage()
+        );
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV0, Optional.of(ZkMigrationState.NONE), false);
+        assertEquals(MetadataVersion.IBP_3_3_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
+    }
+
+    @Test
+    public void testActivationRecords34() {
+        FeatureControlManager featureControl;
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), true);
+        assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), false);
+        assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
+
+        assertEquals(
+            "Should not have ZK migrations enabled on a cluster that was created in KRaft mode.",
+            assertThrows(RuntimeException.class, () -> getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.NONE), true)).getMessage()
+        );
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.NONE), false);
+        assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.NONE, featureControl.zkMigrationState());
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.PRE_MIGRATION), true);
+        assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.MIGRATION), true);
+        assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.MIGRATION, featureControl.zkMigrationState());
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.MIGRATION), false);
+        assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.POST_MIGRATION, featureControl.zkMigrationState());
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.POST_MIGRATION), true);
+        assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.POST_MIGRATION, featureControl.zkMigrationState());
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.of(ZkMigrationState.POST_MIGRATION), false);
+        assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.POST_MIGRATION, featureControl.zkMigrationState());
+    }
+
+    @Test
+    public void testActivationRecordsNonEmptyLog() {
+        FeatureControlManager featureControl;
+
+        featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), true);
+        assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
+        assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());    }
+
+    @Test
+    public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Exception {
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
+        ) {
+            QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
+                    setControllerBuilderInitializer(controllerBuilder -> {
+                        controllerBuilder.setZkMigrationEnabled(true);
+                    }).
+                    setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV0, "test"));
+
+            QuorumControllerTestEnv controlEnv = controlEnvBuilder.build();
+            QuorumController active = controlEnv.activeController();
+            assertEquals(ZkMigrationState.NONE, active.appendReadEvent("read migration state", OptionalLong.empty(),
+                () -> active.featureControl().zkMigrationState()).get(30, TimeUnit.SECONDS));
+            assertThrows(FaultHandlerException.class, controlEnv::close);
+        }
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
index db2ff8300d7..47ffcc3589f 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
@@ -45,7 +45,8 @@ public class ControllerMetadataMetricsTest {
                         "kafka.controller:type=KafkaController,name=GlobalTopicCount",
                         "kafka.controller:type=KafkaController,name=MetadataErrorCount",
                         "kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
-                        "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"
+                        "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
+                        "kafka.controller:type=KafkaController,name=ZkMigrationState"
                     )));
             }
             ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "KafkaController",
diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
index 8c63cbc516d..155973b4a4f 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
@@ -21,17 +21,21 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.image.writer.RecordListWriter;
 import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(value = 40)
@@ -46,7 +50,7 @@ public class FeaturesImageTest {
         map1.put("foo", (short) 2);
         map1.put("bar", (short) 1);
         map1.put("baz", (short) 8);
-        IMAGE1 = new FeaturesImage(map1, MetadataVersion.latest());
+        IMAGE1 = new FeaturesImage(map1, MetadataVersion.latest(), ZkMigrationState.NONE);
 
         DELTA1_RECORDS = new ArrayList<>();
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
@@ -64,7 +68,7 @@ public class FeaturesImageTest {
 
         Map<String, Short> map2 = new HashMap<>();
         map2.put("foo", (short) 3);
-        IMAGE2 = new FeaturesImage(map2, MetadataVersion.latest());
+        IMAGE2 = new FeaturesImage(map2, MetadataVersion.latest(), ZkMigrationState.NONE);
     }
 
     @Test
@@ -89,10 +93,21 @@ public class FeaturesImageTest {
 
     private void testToImageAndBack(FeaturesImage image) throws Throwable {
         RecordListWriter writer = new RecordListWriter();
-        image.write(writer, new ImageWriterOptions.Builder().build());
+        image.write(writer, new ImageWriterOptions.Builder().setMetadataVersion(image.metadataVersion()).build());
         FeaturesDelta delta = new FeaturesDelta(FeaturesImage.EMPTY);
         RecordTestUtils.replayAll(delta, writer.records());
         FeaturesImage nextImage = delta.apply();
         assertEquals(image, nextImage);
     }
+
+    @Test
+    public void testEmpty() {
+        assertTrue(FeaturesImage.EMPTY.isEmpty());
+        assertFalse(new FeaturesImage(Collections.singletonMap("foo", (short) 1),
+            FeaturesImage.EMPTY.metadataVersion(), FeaturesImage.EMPTY.zkMigrationState()).isEmpty());
+        assertFalse(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(),
+            MetadataVersion.IBP_3_3_IV0, FeaturesImage.EMPTY.zkMigrationState()).isEmpty());
+        assertFalse(new FeaturesImage(FeaturesImage.EMPTY.finalizedVersions(),
+            FeaturesImage.EMPTY.metadataVersion(), ZkMigrationState.MIGRATION).isEmpty());
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
index e03c4eb0bff..316ff266e2f 100644
--- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
@@ -48,7 +48,7 @@ public class SnapshotGeneratorTest {
     static class MockEmitter implements SnapshotGenerator.Emitter {
         private final CountDownLatch latch = new CountDownLatch(1);
         private final List<MetadataImage> images = new CopyOnWriteArrayList<>();
-        private RuntimeException problem = null;
+        private volatile RuntimeException problem = null;
 
         MockEmitter setReady() {
             latch.countDown();
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index 04da77b0fc3..20569dc1173 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -33,6 +33,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -87,6 +88,13 @@ public class RecordTestUtils {
         }
     }
 
+    public static void replayOne(
+        Object target,
+        ApiMessageAndVersion recordAndVersion
+    ) {
+        replayAll(target, Collections.singletonList(recordAndVersion));
+    }
+
     /**
      * Replay a list of record batches.
      *
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index 0fb5d8c6fcb..5d927359a5e 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -63,12 +64,12 @@ public class KRaftMigrationDriverTest {
 
         @Override
         public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
-            return null;
+            return CompletableFuture.completedFuture(null);
         }
 
         @Override
-        public OffsetAndEpoch completeMigration() {
-            return new OffsetAndEpoch(100, 1);
+        public CompletableFuture<OffsetAndEpoch> completeMigration() {
+            return CompletableFuture.completedFuture(new OffsetAndEpoch(100, 1));
         }
 
         @Override
@@ -81,6 +82,7 @@ public class KRaftMigrationDriverTest {
 
         private final Set<Integer> brokerIds;
         public final Map<ConfigResource, Map<String, String>> capturedConfigs = new HashMap<>();
+        private ZkMigrationLeadershipState state = null;
 
         public CapturingMigrationClient(Set<Integer> brokerIdsInZk) {
             this.brokerIds = brokerIdsInZk;
@@ -88,21 +90,27 @@ public class KRaftMigrationDriverTest {
 
         @Override
         public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
-            return initialState;
+            if (state == null) {
+                state = initialState;
+            }
+            return state;
         }
 
         @Override
         public ZkMigrationLeadershipState setMigrationRecoveryState(ZkMigrationLeadershipState state) {
+            this.state = state;
             return state;
         }
 
         @Override
         public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
+            this.state = state;
             return state;
         }
 
         @Override
         public ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state) {
+            this.state = state;
             return state;
         }
 
@@ -113,6 +121,7 @@ public class KRaftMigrationDriverTest {
             Map<Integer, PartitionRegistration> topicPartitions,
             ZkMigrationLeadershipState state
         ) {
+            this.state = state;
             return state;
         }
 
@@ -121,6 +130,7 @@ public class KRaftMigrationDriverTest {
             Map<String, Map<Integer, PartitionRegistration>> topicPartitions,
             ZkMigrationLeadershipState state
         ) {
+            this.state = state;
             return state;
         }
 
@@ -131,6 +141,7 @@ public class KRaftMigrationDriverTest {
             ZkMigrationLeadershipState state
         ) {
             capturedConfigs.computeIfAbsent(configResource, __ -> new HashMap<>()).putAll(configMap);
+            this.state = state;
             return state;
         }
 
@@ -140,6 +151,7 @@ public class KRaftMigrationDriverTest {
             Map<String, Double> quotas,
             ZkMigrationLeadershipState state
         ) {
+            this.state = state;
             return state;
         }
 
@@ -148,6 +160,7 @@ public class KRaftMigrationDriverTest {
             long nextProducerId,
             ZkMigrationLeadershipState state
         ) {
+            this.state = state;
             return state;
         }
 
@@ -157,6 +170,7 @@ public class KRaftMigrationDriverTest {
             List<AccessControlEntry> deletedAcls,
             ZkMigrationLeadershipState state
         ) {
+            this.state = state;
             return state;
         }
 
@@ -166,6 +180,7 @@ public class KRaftMigrationDriverTest {
             List<AccessControlEntry> addedAcls,
             ZkMigrationLeadershipState state
         ) {
+            this.state = state;
             return state;
         }
 
@@ -290,6 +305,7 @@ public class KRaftMigrationDriverTest {
         MetadataDelta delta = new MetadataDelta(image);
 
         driver.start();
+        delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
         delta.replay(zkBrokerRecord(1));
         delta.replay(zkBrokerRecord(2));
         delta.replay(zkBrokerRecord(3));
@@ -371,6 +387,7 @@ public class KRaftMigrationDriverTest {
             MetadataDelta delta = new MetadataDelta(image);
 
             driver.start();
+            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(1));
             delta.replay(zkBrokerRecord(2));
             delta.replay(zkBrokerRecord(3));
@@ -393,4 +410,42 @@ public class KRaftMigrationDriverTest {
             }
         }
     }
+
+    @Test
+    public void testSkipWaitForBrokersInDualWrite() throws Exception {
+        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
+        CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet());
+        MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
+        try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
+                3000,
+                new NoOpRecordConsumer(),
+                migrationClient,
+                metadataPropagator,
+                metadataPublisher -> { },
+                faultHandler
+        )) {
+            MetadataImage image = MetadataImage.EMPTY;
+            MetadataDelta delta = new MetadataDelta(image);
+
+            // Fake a complete migration with ZK client
+            migrationClient.setMigrationRecoveryState(
+                ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));
+
+            driver.start();
+            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+            delta.replay(zkBrokerRecord(1));
+            delta.replay(zkBrokerRecord(2));
+            delta.replay(zkBrokerRecord(3));
+            delta.replay(ZkMigrationState.MIGRATION.toRecord().message());
+            MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+            image = delta.apply(provenance);
+
+            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
+            driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
+                new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
+
+            TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+                "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
+        }
+    }
 }
\ No newline at end of file
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index a72c7731aff..0e125359f68 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -67,6 +67,7 @@ RUN mkdir -p "/opt/kafka-3.0.2" && chmod a+rw /opt/kafka-3.0.2 && curl -s "$KAFK
 RUN mkdir -p "/opt/kafka-3.1.2" && chmod a+rw /opt/kafka-3.1.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.1.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.1.2"
 RUN mkdir -p "/opt/kafka-3.2.3" && chmod a+rw /opt/kafka-3.2.3 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.2.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.2.3"
 RUN mkdir -p "/opt/kafka-3.3.1" && chmod a+rw /opt/kafka-3.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.3.1"
+RUN mkdir -p "/opt/kafka-3.4.0" && chmod a+rw /opt/kafka-3.4.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.4.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.4.0"
 
 # Streams test dependencies
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
@@ -88,6 +89,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.0.2-test.jar" -o /opt/kafka-3.0.2/lib
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.1.2-test.jar" -o /opt/kafka-3.1.2/libs/kafka-streams-3.1.2-test.jar
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.2.3-test.jar" -o /opt/kafka-3.2.3/libs/kafka-streams-3.2.3-test.jar
 RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.3.1-test.jar" -o /opt/kafka-3.3.1/libs/kafka-streams-3.3.1-test.jar
+RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.4.0-test.jar" -o /opt/kafka-3.4.0/libs/kafka-streams-3.4.0-test.jar
 
 # The version of Kibosh to use for testing.
 # If you update this, also update vagrant/base.sh
diff --git a/tests/kafkatest/tests/core/zookeeper_migration_test.py b/tests/kafkatest/tests/core/zookeeper_migration_test.py
index a9a75eb8086..24530095871 100644
--- a/tests/kafkatest/tests/core/zookeeper_migration_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_migration_test.py
@@ -17,6 +17,8 @@ from functools import partial
 import time
 
 from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize
+from ducktape.errors import TimeoutError
 
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
@@ -26,7 +28,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
-from kafkatest.version import DEV_BRANCH
+from kafkatest.version import DEV_BRANCH, V_3_4_0
 
 
 class TestMigration(ProduceConsumeValidateTest):
@@ -115,3 +117,140 @@ class TestMigration(ProduceConsumeValidateTest):
                                         message_validator=is_int, version=DEV_BRANCH)
 
         self.run_produce_consume_validate(core_test_action=self.do_migration)
+
+    @parametrize(metadata_quorum=isolated_kraft)
+    def test_pre_migration_mode_3_4(self, metadata_quorum):
+        """
+        Start a KRaft quorum in 3.4 without migrations enabled. Since we were not correctly writing
+        ZkMigrationStateRecord in 3.4, there will be no ZK migration state in the log.
+
+        When upgrading to 3.5+, the controller should see that there are records in the log and
+        automatically bootstrap a ZkMigrationStateRecord(NONE) into the log (indicating that this
+        cluster was created in KRaft mode).
+
+        This test ensures that even if we enable migrations after the upgrade to 3.5, that no migration
+        is able to take place.
+        """
+        self.zk = ZookeeperService(self.test_context, num_nodes=1, version=V_3_4_0)
+        self.zk.start()
+
+        self.kafka = KafkaService(self.test_context,
+                                  num_nodes=3,
+                                  zk=self.zk,
+                                  allow_zk_with_kraft=True,
+                                  version=V_3_4_0,
+                                  server_prop_overrides=[["zookeeper.metadata.migration.enable", "false"]],
+                                  topics={self.topic: {"partitions": self.partitions,
+                                                       "replication-factor": self.replication_factor,
+                                                       'configs': {"min.insync.replicas": 2}}})
+        self.kafka.start()
+
+        # Now reconfigure the cluster as if we're trying to do a migration
+        self.kafka.server_prop_overrides.clear()
+        self.kafka.server_prop_overrides.extend([
+            ["zookeeper.metadata.migration.enable", "true"]
+        ])
+
+        self.logger.info("Performing rolling upgrade.")
+        for node in self.kafka.controller_quorum.nodes:
+            self.logger.info("Stopping controller node %s" % node.account.hostname)
+            self.kafka.controller_quorum.stop_node(node)
+            node.version = DEV_BRANCH
+            self.logger.info("Restarting controller node %s" % node.account.hostname)
+            self.kafka.controller_quorum.start_node(node)
+            # Controller should crash
+
+        # Check the controller's logs for the error message about the migration state
+        saw_expected_error = False
+        for node in self.kafka.controller_quorum.nodes:
+            wait_until(lambda: not self.kafka.controller_quorum.alive(node), timeout_sec=60,
+                       backoff_sec=1, err_msg="Controller did not halt in the expected amount of time")
+            with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
+                monitor.offset = 0
+                try:
+                    # Shouldn't have to wait too long to see this log message after startup
+                    monitor.wait_until(
+                        "Should not have ZK migrations enabled on a cluster that was created in KRaft mode.",
+                        timeout_sec=10.0, backoff_sec=.25,
+                        err_msg=""
+                    )
+                    saw_expected_error = True
+                    break
+                except TimeoutError:
+                    continue
+
+        assert saw_expected_error, "Did not see expected ERROR log in the controller logs"
+
+    def test_upgrade_after_3_4_migration(self):
+        """
+        Perform a migration on version 3.4.0. Then do a rolling upgrade to 3.5+ and ensure we see
+        the correct migration state in the log.
+        """
+        zk_quorum = partial(ServiceQuorumInfo, zk)
+        self.zk = ZookeeperService(self.test_context, num_nodes=1, version=V_3_4_0)
+        self.kafka = KafkaService(self.test_context,
+                                  num_nodes=3,
+                                  zk=self.zk,
+                                  version=V_3_4_0,
+                                  quorum_info_provider=zk_quorum,
+                                  allow_zk_with_kraft=True,
+                                  server_prop_overrides=[["zookeeper.metadata.migration.enable", "true"]])
+
+        remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
+        controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, version=V_3_4_0,
+                                  allow_zk_with_kraft=True,
+                                  isolated_kafka=self.kafka,
+                                  server_prop_overrides=[["zookeeper.connect", self.zk.connect_setting()],
+                                                         ["zookeeper.metadata.migration.enable", "true"]],
+                                  quorum_info_provider=remote_quorum)
+
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.interbroker_security_protocol = "PLAINTEXT"
+        self.zk.start()
+
+        controller.start()
+
+        self.logger.info("Pre-generating clusterId for ZK.")
+        cluster_id_json = """{"version": "1", "id": "%s"}""" % CLUSTER_ID
+        self.zk.create(path="/cluster")
+        self.zk.create(path="/cluster/id", value=cluster_id_json)
+        self.kafka.reconfigure_zk_for_migration(controller)
+        self.kafka.start()
+
+        topic_cfg = {
+            "topic": self.topic,
+            "partitions": self.partitions,
+            "replication-factor": self.replication_factor,
+            "configs": {"min.insync.replicas": 2}
+        }
+        self.kafka.create_topic(topic_cfg)
+
+        # Now we're in dual-write mode. The 3.4 controller will have written a PRE_MIGRATION record (1) into the log.
+        # We now upgrade the controller to 3.5+ where 1 is redefined as MIGRATION.
+        for node in controller.nodes:
+            self.logger.info("Stopping controller node %s" % node.account.hostname)
+            self.kafka.controller_quorum.stop_node(node)
+            node.version = DEV_BRANCH
+            self.logger.info("Restarting controller node %s" % node.account.hostname)
+            self.kafka.controller_quorum.start_node(node)
+            self.wait_until_rejoin()
+            self.logger.info("Successfully restarted controller node %s" % node.account.hostname)
+
+        # Check the controller's logs for the INFO message that we're still in the migration state
+        saw_expected_log = False
+        for node in self.kafka.controller_quorum.nodes:
+            with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
+                monitor.offset = 0
+                try:
+                    # Shouldn't have to wait too long to see this log message after startup
+                    monitor.wait_until(
+                        "Staying in the ZK migration",
+                        timeout_sec=10.0, backoff_sec=.25,
+                        err_msg=""
+                    )
+                    saw_expected_log = True
+                    break
+                except TimeoutError:
+                    continue
+
+        assert saw_expected_log, "Did not see expected INFO log after upgrading from a 3.4 migration"