You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/26 01:21:46 UTC

[kafka] branch 3.3 updated: KAFKA-14177: Correctly support older kraft versions without FeatureLevelRecord (#12513)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 138116ee37a KAFKA-14177: Correctly support older kraft versions without FeatureLevelRecord (#12513)
138116ee37a is described below

commit 138116ee37ae623b1ea2e8f431e4d4b74f83f966
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu Aug 25 18:12:31 2022 -0700

    KAFKA-14177: Correctly support older kraft versions without FeatureLevelRecord (#12513)
    
    The main changes here are ensuring that we always have a metadata.version record in the log, making
    ˘sure that the bootstrap file can be used for records other than the metadata.version record (for
    example, we will want to put SCRAM initialization records there), and fixing some bugs.
    
    If no feature level record is in the log and the IBP is less than 3.3IV0, then we assume the minimum KRaft
    version for all records in the log.
    
    Fix some issues related to initializing new clusters. If there are no records in the log at all,
    then insert the bootstrap records in a single batch. If there are records, but no metadata version,
    process the existing records as though they were metadata.version 3.3IV0 and then append a metadata
    version record setting version 3.3IV0.  Previously, we were not clearly distinguishing between the
    case where the metadata log was empty, and the case where we just needed to add a metadata.version
    record.
    
    Refactor BootstrapMetadata into an immutable class which contains a 3-tuple of metadata version,
    record list, and source. The source field is used to log where the bootstrap metadata was obtained
    from. This could be a bootstrap file, the static configuration, or just the software defaults.
    Move the logic for reading and writing bootstrap files into BootstrapDirectory.java.
    
    Add LogReplayTracker, which tracks whether the log is empty.
    
    Fix a bug in FeatureControlManager where it was possible to use a "downgrade" operation to
    transition to a newer version. Do not store whether we have seen a metadata version or not in
    FeatureControlManager, since that is now handled by LogReplayTracker.
    
    Introduce BatchFileReader, which is a simple way of reading a file containing batches of snapshots
    that does not require spawning a thread. Rename SnapshotFileWriter to BatchFileWriter to be
    consistent, and to reflect the fact that bootstrap files aren't snapshots.
    
    QuorumController#processBrokerHeartbeat: add an explanatory comment.
    
    Reviewers: David Arthur <mu...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 checkstyle/import-control.xml                      |   3 +
 checkstyle/suppressions.xml                        |   2 +-
 .../main/scala/kafka/server/ControllerServer.scala |   3 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  21 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   4 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  |  26 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |   7 +-
 .../transaction/ProducerIdsIntegrationTest.scala   |   2 +-
 .../kafka/server/KRaftClusterTest.scala            |  33 +--
 .../server/MetadataVersionIntegrationTest.scala    |   8 +-
 .../kafka/server/QuorumTestHarness.scala           |  12 +-
 .../unit/kafka/server/KafkaRaftServerTest.scala    |  21 +-
 .../metadata/BrokerMetadataListenerTest.scala      |   2 +-
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  12 +-
 .../apache/kafka/controller/BootstrapMetadata.java | 215 ----------------
 .../kafka/controller/ClusterControlManager.java    |  46 +++-
 .../kafka/controller/FeatureControlManager.java    |  54 ++--
 .../apache/kafka/controller/LogReplayTracker.java  |  73 ++++++
 .../apache/kafka/controller/QuorumController.java  | 229 +++++++++--------
 .../java/org/apache/kafka/image/FeaturesImage.java |  10 +-
 .../metadata/bootstrap/BootstrapDirectory.java     | 124 ++++++++++
 .../metadata/bootstrap/BootstrapMetadata.java      | 140 +++++++++++
 .../kafka/metadata/util/BatchFileReader.java       | 184 ++++++++++++++
 ...napshotFileWriter.java => BatchFileWriter.java} |  12 +-
 .../apache/kafka/timeline/SnapshotRegistry.java    |   9 +-
 .../common/metadata/FeatureLevelRecord.json        |   3 +
 .../kafka/controller/BootstrapMetadataTest.java    |  95 -------
 .../controller/ClusterControlManagerTest.java      |  73 +++++-
 .../controller/FeatureControlManagerTest.java      | 182 ++++++++++----
 .../kafka/controller/LogReplayTrackerTest.java     |  37 +++
 .../kafka/controller/QuorumControllerTest.java     | 273 +++++++++++++--------
 .../kafka/controller/QuorumControllerTestEnv.java  |  28 +--
 .../metadata/bootstrap/BootstrapDirectoryTest.java | 117 +++++++++
 .../metadata/bootstrap/BootstrapMetadataTest.java  |  91 +++++++
 .../kafka/timeline/SnapshotRegistryTest.java       |   2 +-
 .../kafka/timeline/SnapshottableHashTableTest.java |   2 +-
 .../kafka/server/common/MetadataVersion.java       |   9 +
 .../kafka/server/common/MetadataVersionTest.java   |  23 +-
 38 files changed, 1482 insertions(+), 705 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index d24d1e7e5e9..45dc28d9e3a 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -277,6 +277,9 @@
       <allow pkg="org.apache.kafka.controller" />
       <allow pkg="org.apache.kafka.metadata" />
     </subpackage>
+    <subpackage name="bootstrap">
+      <allow pkg="org.apache.kafka.snapshot" />
+    </subpackage>
     <subpackage name="fault">
       <allow pkg="org.apache.kafka.server.fault" />
     </subpackage>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a891cf647f7..ef3947ad91a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -294,7 +294,7 @@
 
     <!-- metadata -->
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
+              files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest|ClusterControlManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 19a6e307d62..441171fe1d9 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Endpoint}
-import org.apache.kafka.controller.{BootstrapMetadata, Controller, ControllerMetrics, QuorumController, QuorumFeatures}
+import org.apache.kafka.controller.{Controller, ControllerMetrics, QuorumController, QuorumFeatures}
 import org.apache.kafka.metadata.KafkaConfigSchema
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -45,6 +45,7 @@ import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 8ce0bc18613..63f77d1a43d 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -29,16 +29,16 @@ import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{KafkaException, Uuid}
-import org.apache.kafka.controller.{BootstrapMetadata, QuorumControllerMetrics}
+import org.apache.kafka.controller.QuorumControllerMetrics
+import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
 import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
 import org.apache.kafka.raft.RaftConfig
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.fault.{LoggingFaultHandler, ProcessExitingFaultHandler}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
-import java.nio.file.Paths
+import java.util.Optional
 import scala.collection.Seq
-import scala.compat.java8.FunctionConverters.asJavaSupplier
 import scala.jdk.CollectionConverters._
 
 /**
@@ -208,16 +208,9 @@ object KafkaRaftServer {
           "If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
     }
 
-    // Load the bootstrap metadata file. In the case of an upgrade from older KRaft where there is no bootstrap metadata,
-    // read the IBP from config in order to bootstrap the equivalent metadata version.
-    def getUserDefinedIBPVersionOrThrow(): MetadataVersion = {
-      if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
-        MetadataVersion.fromVersionString(config.interBrokerProtocolVersionString)
-      } else {
-        throw new KafkaException(s"Cannot upgrade from KRaft version prior to 3.3 without first setting ${KafkaConfig.InterBrokerProtocolVersionProp} on each broker.")
-      }
-    }
-    val bootstrapMetadata = BootstrapMetadata.load(Paths.get(config.metadataLogDir), asJavaSupplier(() => getUserDefinedIBPVersionOrThrow()))
+    val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir,
+      Optional.ofNullable(config.interBrokerProtocolVersionString))
+    val bootstrapMetadata = bootstrapDirectory.read()
 
     (metaProperties, bootstrapMetadata, offlineDirs.toSeq)
   }
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index e4d31baf1b6..c82523eff4b 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -30,8 +30,8 @@ import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT
 import org.apache.kafka.common.protocol.ByteBufferAccessor
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.controller.BootstrapMetadata
 import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
 import org.apache.kafka.snapshot.Snapshots
 
 import scala.jdk.CollectionConverters._
@@ -254,7 +254,7 @@ object DumpLogSegments {
       val startOffset = file.getName.split("\\.")(0).toLong
       println(s"Log starting offset: $startOffset")
     } else if (file.getName.endsWith(Snapshots.SUFFIX)) {
-      if (file.getName == BootstrapMetadata.BOOTSTRAP_FILE) {
+      if (file.getName == BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME) {
         println("KRaft bootstrap snapshot")
       } else {
         val path = Snapshots.parse(file.toPath).get()
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index a96275cc27c..0f798e24fc2 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -26,9 +26,10 @@ import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
 import net.sourceforge.argparse4j.inf.Namespace
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
 import org.apache.kafka.server.common.MetadataVersion
 
+import java.util.Optional
 import scala.collection.mutable
 
 object StorageTool extends Logging {
@@ -47,7 +48,7 @@ object StorageTool extends Logging {
         case "format" =>
           val directories = configToLogDirectories(config.get)
           val clusterId = namespace.getString("cluster_id")
-          val metadataVersion = getMetadataVersion(namespace)
+          val metadataVersion = getMetadataVersion(namespace, Option(config.get.interBrokerProtocolVersionString))
           if (!metadataVersion.isKRaftSupported) {
             throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
           }
@@ -113,10 +114,18 @@ object StorageTool extends Logging {
 
   def configToSelfManagedMode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty
 
-  def getMetadataVersion(namespace: Namespace): MetadataVersion = {
+  def getMetadataVersion(
+    namespace: Namespace,
+    defaultVersionString: Option[String]
+  ): MetadataVersion = {
+    val defaultValue = defaultVersionString match {
+      case Some(versionString) => MetadataVersion.fromVersionString(versionString)
+      case None => MetadataVersion.latest()
+    }
+
     Option(namespace.getString("release_version"))
       .map(ver => MetadataVersion.fromVersionString(ver))
-      .getOrElse(MetadataVersion.latest())
+      .getOrElse(defaultValue)
   }
 
   def infoCommand(stream: PrintStream, selfManagedMode: Boolean, directories: Seq[String]): Int = {
@@ -233,7 +242,7 @@ object StorageTool extends Logging {
       if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, "meta.properties"))) {
           true
       } else if (!ignoreFormatted) {
-        throw new TerseFailure(s"Log directory ${directory} is already formatted. " +
+        throw new TerseFailure(s"Log directory $directory is already formatted. " +
           "Use --ignore-formatted to ignore this directory and format the others.")
       } else {
         false
@@ -247,14 +256,15 @@ object StorageTool extends Logging {
         Files.createDirectories(Paths.get(directory))
       } catch {
         case e: Throwable => throw new TerseFailure(s"Unable to create storage " +
-          s"directory ${directory}: ${e.getMessage}")
+          s"directory $directory: ${e.getMessage}")
       }
       val metaPropertiesPath = Paths.get(directory, "meta.properties")
       val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
       checkpoint.write(metaProperties.toProperties)
 
-      val bootstrapMetadata = BootstrapMetadata.create(metadataVersion)
-      BootstrapMetadata.write(bootstrapMetadata, Paths.get(directory))
+      val bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "format command")
+      val bootstrapDirectory = new BootstrapDirectory(directory, Optional.empty())
+      bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 
       stream.println(s"Formatting ${directory} with metadata.version ${metadataVersion}.")
     })
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 139b05fa540..417c083457f 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -35,10 +35,10 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.controller.BootstrapMetadata;
 import org.apache.kafka.controller.Controller;
 import org.apache.kafka.controller.MockControllerMetrics;
 import org.apache.kafka.metadata.MetadataRecordSerde;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.raft.RaftConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -186,7 +186,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     String threadNamePrefix = String.format("controller%d_", node.id());
                     MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id());
                     TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
-                    BootstrapMetadata bootstrapMetadata = BootstrapMetadata.create(nodes.bootstrapMetadataVersion());
+                    BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
+                        fromVersion(nodes.bootstrapMetadataVersion(), "testkit");
                     KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
                         metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
                         Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
@@ -392,7 +393,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     StorageTool.formatCommand(out,
                             JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
                             properties,
-                            MetadataVersion.MINIMUM_KRAFT_VERSION,
+                            MetadataVersion.MINIMUM_BOOTSTRAP_VERSION,
                             false);
                 } finally {
                     for (String line : stream.toString().split(String.format("%n"))) {
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index 7d3203e9309..558f0041e0a 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -46,7 +46,7 @@ class ProducerIdsIntegrationTest {
   @ClusterTests(Array(
     new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
     new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
-    new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV1)
+    new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
   ))
   def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
     verifyUniqueIds(clusterInstance)
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index c550553917b..a360ee15a22 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -296,13 +296,14 @@ class KRaftClusterTest {
 
   @Test
   def testCreateClusterInvalidMetadataVersion(): Unit = {
-    assertThrows(classOf[IllegalArgumentException], () => {
-      new KafkaClusterTestKit.Builder(
-        new TestKitNodes.Builder().
-          setBootstrapMetadataVersion(MetadataVersion.IBP_2_7_IV0).
-          setNumBrokerNodes(1).
-          setNumControllerNodes(1).build()).build()
-    })
+    assertEquals("Bootstrap metadata versions before 3.3-IV0 are not supported. Can't load " +
+      "metadata from testkit", assertThrows(classOf[RuntimeException], () => {
+        new KafkaClusterTestKit.Builder(
+          new TestKitNodes.Builder().
+            setBootstrapMetadataVersion(MetadataVersion.IBP_2_7_IV0).
+            setNumBrokerNodes(1).
+            setNumControllerNodes(1).build()).build()
+    }).getMessage)
   }
 
   private def doOnStartedKafkaCluster(numControllerNodes: Int = 1,
@@ -426,15 +427,17 @@ class KRaftClusterTest {
         }, "Timed out waiting for replica assignments for topic foo. " +
           s"Wanted: ${expectedMapping}. Got: ${currentMapping}")
 
-        checkReplicaManager(
-          cluster,
-          List(
-            (0, List(true, true, false, true)),
-            (1, List(true, true, false, true)),
-            (2, List(true, true, true, true)),
-            (3, List(false, false, true, true))
+        TestUtils.retry(60000) {
+          checkReplicaManager(
+            cluster,
+            List(
+              (0, List(true, true, false, true)),
+              (1, List(true, true, false, true)),
+              (2, List(true, true, true, true)),
+              (3, List(false, false, true, true))
+            )
           )
-        )
+        }
       } finally {
         admin.close()
       }
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index c060e3a6daa..5270f627bfd 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -32,9 +32,9 @@ import scala.jdk.CollectionConverters._
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class MetadataVersionIntegrationTest {
   @ClusterTests(value = Array(
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV1),
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_1_IV0),
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_2_IV0)
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0),
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1),
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV2)
   ))
   def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = {
     val admin = clusterInstance.createAdminClient()
@@ -44,7 +44,7 @@ class MetadataVersionIntegrationTest {
     assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
 
     // Update to new version
-    val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
+    val updateVersion = MetadataVersion.IBP_3_3_IV3.featureLevel.shortValue
     val updateResult = admin.updateFeatures(
       Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
     updateResult.all().get()
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index c4ca966f9ab..6bb2e769cda 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -33,8 +33,9 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{Exit, Time}
-import org.apache.kafka.controller.{BootstrapMetadata, QuorumControllerMetrics}
+import org.apache.kafka.controller.QuorumControllerMetrics
 import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
@@ -47,7 +48,6 @@ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, T
 import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, immutable}
 import scala.compat.java8.OptionConverters._
-import scala.jdk.CollectionConverters._
 
 trait QuorumImplementation {
   def createBroker(
@@ -142,10 +142,10 @@ abstract class QuorumTestHarness extends Logging {
 
   protected def metadataVersion: MetadataVersion = MetadataVersion.latest()
 
-  val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()
+  private var testInfo: TestInfo = _
+  private var implementation: QuorumImplementation = _
 
-  private var testInfo: TestInfo = null
-  private var implementation: QuorumImplementation = null
+  val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()
 
   def isKRaftTest(): Boolean = {
     TestInfoUtils.isKRaft(testInfo)
@@ -321,7 +321,7 @@ abstract class QuorumTestHarness extends Logging {
         controllerQuorumVotersFuture = controllerQuorumVotersFuture,
         configSchema = KafkaRaftServer.configSchema,
         raftApiVersions = raftManager.apiVersions,
-        bootstrapMetadata = BootstrapMetadata.create(metadataVersion, bootstrapRecords.asJava),
+        bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "test harness"),
         metadataFaultHandler = faultHandler,
         fatalFaultHandler = faultHandler,
       )
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 17483e58a6a..1fc98fac3ea 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -18,12 +18,12 @@ package kafka.server
 
 import java.io.File
 import java.nio.file.Files
-import java.util.Properties
+import java.util.{Optional, Properties}
 import kafka.common.{InconsistentBrokerMetadataException, InconsistentNodeIdException}
 import kafka.log.UnifiedLog
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
@@ -97,8 +97,8 @@ class KafkaRaftServerTest {
   }
 
   private def writeBootstrapMetadata(logDir: File, metadataVersion: MetadataVersion): Unit = {
-    val bootstrapMetadata = BootstrapMetadata.create(metadataVersion)
-    BootstrapMetadata.write(bootstrapMetadata, logDir.toPath)
+    val bootstrapDirectory = new BootstrapDirectory(logDir.toString(), Optional.empty())
+    bootstrapDirectory.writeBinaryFile(BootstrapMetadata.fromVersion(metadataVersion, "test"))
   }
 
   @Test
@@ -235,14 +235,14 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
     configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
     configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    configProperties.put(KafkaConfig.InterBrokerProtocolVersionProp, "3.2")
+    configProperties.put(KafkaConfig.InterBrokerProtocolVersionProp, "3.3-IV1")
 
     val (loadedMetaProperties, bootstrapMetadata, offlineDirs) =
       invokeLoadMetaProperties(metaProperties, configProperties, None)
 
     assertEquals(metaProperties, loadedMetaProperties)
     assertEquals(Seq.empty, offlineDirs)
-    assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.IBP_3_2_IV0)
+    assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.IBP_3_3_IV1)
   }
 
   @Test
@@ -262,8 +262,11 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     configProperties.put(KafkaConfig.LogDirProp, logDir.getAbsolutePath)
 
-    val config = KafkaConfig.fromProps(configProperties)
-    assertEquals("Cannot upgrade from KRaft version prior to 3.3 without first setting inter.broker.protocol.version on each broker.",
-      assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config)).getMessage)
+    val (loadedMetaProperties, bootstrapMetadata, offlineDirs) =
+      invokeLoadMetaProperties(metaProperties, configProperties, None)
+
+    assertEquals(metaProperties, loadedMetaProperties)
+    assertEquals(Seq.empty, offlineDirs)
+    assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.latest())
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 6c8c2599d29..92e9fe9a2ae 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -82,7 +82,7 @@ class BrokerMetadataListenerTest {
         )
       )
       val imageRecords = listener.getImageRecords().get()
-      assertEquals(1, imageRecords.size())
+      assertEquals(0, imageRecords.size())
       assertEquals(100L, listener.highestMetadataOffset)
       assertEquals(0L, metrics.lastAppliedRecordOffset.get)
       assertEquals(0L, metrics.lastAppliedRecordTimestamp.get)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 0e11471527a..cbffbf0292e 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -191,18 +191,26 @@ Found problem:
   @Test
   def testDefaultMetadataVersion(): Unit = {
     val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
-    val mv = StorageTool.getMetadataVersion(namespace)
+    val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = None)
     assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(),
       "Expected the default metadata.version to be the latest version")
   }
 
+  @Test
+  def testConfiguredMetadataVersion(): Unit = {
+    val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
+    val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = Some(MetadataVersion.IBP_3_3_IV2.toString))
+    assertEquals(MetadataVersion.IBP_3_3_IV2.featureLevel(), mv.featureLevel(),
+      "Expected the default metadata.version to be 3.3-IV2")
+  }
+
   @Test
   def testMetadataVersionFlags(): Unit = {
     def parseMetadataVersion(strings: String*): MetadataVersion = {
       var args = mutable.Seq("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ")
       args ++= strings
       val namespace = StorageTool.parseArguments(args.toArray)
-      StorageTool.getMetadataVersion(namespace)
+      StorageTool.getMetadataVersion(namespace, defaultVersionString = None)
     }
 
     var mv = parseMetadataVersion("--release-version", "3.0")
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
deleted file mode 100644
index d9d0651a193..00000000000
--- a/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.MetadataRecordType;
-import org.apache.kafka.metadata.util.SnapshotFileReader;
-import org.apache.kafka.metadata.util.SnapshotFileWriter;
-import org.apache.kafka.raft.Batch;
-import org.apache.kafka.raft.BatchReader;
-import org.apache.kafka.raft.RaftClient;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.snapshot.SnapshotReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
-import java.util.stream.Stream;
-
-
-/**
- * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the
- * format is the same as a KRaft snapshot.
- */
-public class BootstrapMetadata {
-    private static final Logger log = LoggerFactory.getLogger(BootstrapMetadata.class);
-
-    public static final String BOOTSTRAP_FILE = "bootstrap.checkpoint";
-
-    private final MetadataVersion metadataVersion;
-
-    private final List<ApiMessageAndVersion> records;
-
-    BootstrapMetadata(MetadataVersion metadataVersion, List<ApiMessageAndVersion> records) {
-        this.metadataVersion = metadataVersion;
-        this.records = Collections.unmodifiableList(records);
-    }
-
-    public MetadataVersion metadataVersion() {
-        return this.metadataVersion;
-    }
-
-    public List<ApiMessageAndVersion> records() {
-        return records;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        BootstrapMetadata metadata = (BootstrapMetadata) o;
-        return metadataVersion == metadata.metadataVersion;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(metadataVersion);
-    }
-
-    @Override
-    public String toString() {
-        return "BootstrapMetadata{" +
-            "metadataVersion=" + metadataVersion +
-            '}';
-    }
-
-    /**
-     * A raft client listener that simply collects all of the commits and snapshots into a mapping of
-     * metadata record type to list of records.
-     */
-    private static class BootstrapListener implements RaftClient.Listener<ApiMessageAndVersion> {
-        private final List<ApiMessageAndVersion> records = new ArrayList<>();
-
-        @Override
-        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
-            try {
-                while (reader.hasNext()) {
-                    Batch<ApiMessageAndVersion> batch = reader.next();
-                    records.addAll(batch.records());
-                }
-            } finally {
-                reader.close();
-            }
-        }
-
-        @Override
-        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
-            try {
-                while (reader.hasNext()) {
-                    Batch<ApiMessageAndVersion> batch = reader.next();
-                    for (ApiMessageAndVersion messageAndVersion : batch) {
-                        records.add(messageAndVersion);
-                    }
-                }
-            } finally {
-                reader.close();
-            }
-        }
-    }
-
-    public static BootstrapMetadata create(MetadataVersion metadataVersion) {
-        return create(metadataVersion, new ArrayList<>());
-    }
-
-    public static BootstrapMetadata create(MetadataVersion metadataVersion, List<ApiMessageAndVersion> records) {
-        if (!metadataVersion.isKRaftSupported()) {
-            throw new IllegalArgumentException(String.format(
-                "Cannot create BootstrapMetadata with a non-KRaft metadata version %s. Minimum version is %s",
-                metadataVersion, MetadataVersion.MINIMUM_KRAFT_VERSION));
-        }
-        records.add(new ApiMessageAndVersion(
-            new FeatureLevelRecord()
-                .setName(MetadataVersion.FEATURE_NAME)
-                .setFeatureLevel(metadataVersion.featureLevel()),
-            FeatureLevelRecord.LOWEST_SUPPORTED_VERSION));
-
-        return new BootstrapMetadata(metadataVersion, records);
-    }
-
-    /**
-     * Load a bootstrap snapshot into a read-only bootstrap metadata object and return it.
-     *
-     * @param bootstrapDir              The directory from which to read the snapshot file.
-     * @param fallbackVersionSupplier   A function that returns the metadata.version to use when upgrading from an older KRaft
-     * @return                          The read-only bootstrap metadata
-     * @throws Exception
-     */
-    public static BootstrapMetadata load(Path bootstrapDir, Supplier<MetadataVersion> fallbackVersionSupplier) throws Exception {
-        final Path bootstrapPath = bootstrapDir.resolve(BOOTSTRAP_FILE);
-
-        if (!Files.exists(bootstrapPath)) {
-            // Upgrade scenario from KRaft prior to 3.3 (i.e., no bootstrap metadata present)
-            MetadataVersion fallbackVersion = fallbackVersionSupplier.get();
-            if (fallbackVersion.isKRaftSupported()) {
-                log.debug("Missing bootstrap file, this appears to be a KRaft cluster older than 3.3. Setting metadata.version to {}.",
-                    fallbackVersion.featureLevel());
-                return BootstrapMetadata.create(fallbackVersion);
-            } else {
-                throw new Exception(String.format("Could not set fallback bootstrap metadata with non-KRaft metadata version of %s", fallbackVersion));
-            }
-        }
-
-        BootstrapListener listener = new BootstrapListener();
-        try (SnapshotFileReader reader = new SnapshotFileReader(bootstrapPath.toString(), listener)) {
-            reader.startup();
-            reader.caughtUpFuture().get();
-        } catch (ExecutionException e) {
-            throw new Exception("Failed to load snapshot", e.getCause());
-        }
-
-        Optional<FeatureLevelRecord> metadataVersionRecord = listener.records.stream()
-            .flatMap(message -> {
-                MetadataRecordType type = MetadataRecordType.fromId(message.message().apiKey());
-                if (!type.equals(MetadataRecordType.FEATURE_LEVEL_RECORD)) {
-                    return Stream.empty();
-                }
-                FeatureLevelRecord record = (FeatureLevelRecord) message.message();
-                if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
-                    return Stream.of(record);
-                } else {
-                    return Stream.empty();
-                }
-            })
-            .findFirst();
-
-        if (metadataVersionRecord.isPresent()) {
-            return new BootstrapMetadata(MetadataVersion.fromFeatureLevel(metadataVersionRecord.get().featureLevel()), listener.records);
-        } else {
-            throw new Exception("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
-        }
-    }
-
-    /**
-     * Write a set of bootstrap metadata to the bootstrap snapshot in a given directory
-     *
-     * @param metadata      The metadata to persist
-     * @param bootstrapDir  The directory in which to create the bootstrap snapshot
-     * @throws IOException
-     */
-    public static void write(BootstrapMetadata metadata, Path bootstrapDir) throws IOException {
-        final Path bootstrapPath = bootstrapDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE);
-        if (Files.exists(bootstrapPath)) {
-            throw new IOException("Cannot write metadata bootstrap file " + bootstrapPath +
-                ". File already already exists.");
-        }
-        try (SnapshotFileWriter bootstrapWriter = SnapshotFileWriter.open(bootstrapPath)) {
-            bootstrapWriter.append(metadata.records());
-        }
-    }
-}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index d30f4324217..5bc5bd2c3af 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -353,19 +353,16 @@ public class ClusterControlManager {
                 setSecurityProtocol(listener.securityProtocol()));
         }
         for (BrokerRegistrationRequestData.Feature feature : request.features()) {
-            Optional<Short> finalized = finalizedFeatures.get(feature.name());
-            if (finalized.isPresent()) {
-                if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized.get())) {
-                    throw new UnsupportedVersionException("Unable to register because " +
-                            "the broker has an unsupported version of " + feature.name());
-                }
-            } else {
-                log.warn("Broker registered with feature {} that is unknown to the controller", feature.name());
-            }
-            record.features().add(new BrokerFeature().
-                setName(feature.name()).
-                setMinSupportedVersion(feature.minSupportedVersion()).
-                setMaxSupportedVersion(feature.maxSupportedVersion()));
+            record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature));
+        }
+        if (request.features().find(MetadataVersion.FEATURE_NAME) == null) {
+            // Brokers that don't send a supported metadata.version range are assumed to only
+            // support the original metadata.version.
+            record.features().add(processRegistrationFeature(brokerId, finalizedFeatures,
+                    new BrokerRegistrationRequestData.Feature().
+                            setName(MetadataVersion.FEATURE_NAME).
+                            setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()).
+                            setMaxSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel())));
         }
 
         heartbeatManager.register(brokerId, record.fenced());
@@ -376,6 +373,29 @@ public class ClusterControlManager {
         return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
     }
 
+    BrokerFeature processRegistrationFeature(
+        int brokerId,
+        FinalizedControllerFeatures finalizedFeatures,
+        BrokerRegistrationRequestData.Feature feature
+    ) {
+        Optional<Short> finalized = finalizedFeatures.get(feature.name());
+        if (finalized.isPresent()) {
+            if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized.get())) {
+                throw new UnsupportedVersionException("Unable to register because the broker " +
+                    "does not support version " + finalized.get() + " of " + feature.name() +
+                        ". It wants a version between " + feature.minSupportedVersion() + " and " +
+                        feature.maxSupportedVersion() + ", inclusive.");
+            }
+        } else {
+            log.warn("Broker {} registered with feature {} that is unknown to the controller",
+                    brokerId, feature.name());
+        }
+        return new BrokerFeature().
+                setName(feature.name()).
+                setMinSupportedVersion(feature.minSupportedVersion()).
+                setMaxSupportedVersion(feature.maxSupportedVersion());
+    }
+
     public OptionalLong registerBrokerRecordOffset(int brokerId) {
         if (registerBrokerRecordOffsets.containsKey(brokerId)) {
             return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId));
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index c092abcdcca..46f3be91b60 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -52,7 +52,8 @@ public class FeatureControlManager {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
         private QuorumFeatures quorumFeatures = null;
-        private MetadataVersion metadataVersion = MetadataVersion.MINIMUM_KRAFT_VERSION;
+        private MetadataVersion metadataVersion = MetadataVersion.latest();
+        private MetadataVersion minimumBootstrapVersion = MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;
 
         Builder setLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -74,6 +75,11 @@ public class FeatureControlManager {
             return this;
         }
 
+        Builder setMinimumBootstrapVersion(MetadataVersion minimumBootstrapVersion) {
+            this.minimumBootstrapVersion = minimumBootstrapVersion;
+            return this;
+        }
+
         public FeatureControlManager build() {
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
@@ -84,7 +90,8 @@ public class FeatureControlManager {
             return new FeatureControlManager(logContext,
                 quorumFeatures,
                 snapshotRegistry,
-                metadataVersion);
+                metadataVersion,
+                minimumBootstrapVersion);
         }
     }
 
@@ -106,21 +113,22 @@ public class FeatureControlManager {
     private final TimelineObject<MetadataVersion> metadataVersion;
 
     /**
-     * A boolean to see if we have encountered a metadata.version or not.
+     * The minimum bootstrap version that we can't downgrade before.
      */
-    private final TimelineObject<Boolean> sawMetadataVersion;
+    private final MetadataVersion minimumBootstrapVersion;
 
     private FeatureControlManager(
         LogContext logContext,
         QuorumFeatures quorumFeatures,
         SnapshotRegistry snapshotRegistry,
-        MetadataVersion metadataVersion
+        MetadataVersion metadataVersion,
+        MetadataVersion minimumBootstrapVersion
     ) {
         this.log = logContext.logger(FeatureControlManager.class);
         this.quorumFeatures = quorumFeatures;
         this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
         this.metadataVersion = new TimelineObject<>(snapshotRegistry, metadataVersion);
-        this.sawMetadataVersion = new TimelineObject<>(snapshotRegistry, false);
+        this.minimumBootstrapVersion = minimumBootstrapVersion;
     }
 
     ControllerResult<Map<String, ApiError>> updateFeatures(
@@ -159,11 +167,11 @@ public class FeatureControlManager {
                 "The controller does not support the given upgrade type.");
         }
 
-        final Short currentVersion;
+        final short currentVersion;
         if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
             currentVersion = metadataVersion.get().featureLevel();
         } else {
-            currentVersion = finalizedVersions.get(featureName);
+            currentVersion = finalizedVersions.getOrDefault(featureName, (short) 0);
         }
 
         if (newVersion < 0) {
@@ -188,12 +196,16 @@ public class FeatureControlManager {
             }
         }
 
-        if (currentVersion != null && newVersion < currentVersion) {
+        if (newVersion < currentVersion) {
             if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
                 return invalidUpdateVersion(featureName, newVersion,
                     "Can't downgrade the version of this feature without setting the " +
                     "upgrade type to either safe or unsafe downgrade.");
             }
+        } else if (newVersion > currentVersion) {
+            if (!upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
+                return invalidUpdateVersion(featureName, newVersion, "Can't downgrade to a newer version.");
+            }
         }
 
         if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
@@ -231,6 +243,12 @@ public class FeatureControlManager {
             return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
         }
 
+        // We cannot set a version earlier than IBP_3_3_IV0, since that was the first version that contained
+        // FeatureLevelRecord itself.
+        if (newVersion.isLessThan(minimumBootstrapVersion)) {
+            return invalidMetadataVersion(newVersionLevel, "Unable to set a metadata.version less than " +
+                    minimumBootstrapVersion);
+        }
         if (newVersion.isLessThan(currentVersion)) {
             // This is a downgrade
             boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
@@ -269,13 +287,6 @@ public class FeatureControlManager {
         return new FinalizedControllerFeatures(features, epoch);
     }
 
-    /**
-     * @return true if a FeatureLevelRecord for "metadata.version" has been replayed. False otherwise
-     */
-    boolean sawMetadataVersion() {
-        return this.sawMetadataVersion.get();
-    }
-
     public void replay(FeatureLevelRecord record) {
         VersionRange range = quorumFeatures.localSupportedFeature(record.name());
         if (!range.contains(record.featureLevel())) {
@@ -285,7 +296,6 @@ public class FeatureControlManager {
         if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
             log.info("Setting metadata.version to {}", record.featureLevel());
             metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
-            sawMetadataVersion.set(true);
         } else {
             if (record.featureLevel() == 0) {
                 log.info("Removing feature {}", record.name());
@@ -316,10 +326,12 @@ public class FeatureControlManager {
         public List<ApiMessageAndVersion> next() {
             // Write the metadata.version first
             if (!wroteVersion) {
-                wroteVersion = true;
-                return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
-                    .setName(MetadataVersion.FEATURE_NAME)
-                    .setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+                if (metadataVersion.isAtLeast(minimumBootstrapVersion)) {
+                    wroteVersion = true;
+                    return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
+                            .setName(MetadataVersion.FEATURE_NAME)
+                            .setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+                }
             }
             // Then write the rest of the features
             if (!hasNext()) throw new NoSuchElementException();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java b/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
new file mode 100644
index 00000000000..2e29a2a52f9
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+
+/**
+ * The LogReplayTracker manages state associated with replaying the metadata log, such as whether
+ * we have seen any records and whether we have seen any metadata version records. It is accessed
+ * solely from the quorum controller thread.
+ */
+public class LogReplayTracker {
+    public static class Builder {
+        private LogContext logContext = null;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        public LogReplayTracker build() {
+            if (logContext == null) logContext = new LogContext();
+            return new LogReplayTracker(logContext);
+        }
+    }
+
+    /**
+     * The slf4j logger.
+     */
+    private final Logger log;
+
+    /**
+     * True if we haven't replayed any records yet.
+     */
+    private boolean empty;
+
+    private LogReplayTracker(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(LogReplayTracker.class);
+        resetToEmpty();
+    }
+
+    void resetToEmpty() {
+        this.empty = true;
+    }
+
+    boolean empty() {
+        return empty;
+    }
+
+    void replay(ApiMessage message) {
+        empty = false;
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index dcb9ab77fa6..e669817dd21 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -78,6 +78,7 @@ import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.placement.ReplicaPlacer;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
 import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
@@ -100,6 +101,7 @@ import org.apache.kafka.snapshot.SnapshotWriter;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -658,6 +660,16 @@ public final class QuorumController implements Controller {
         return clusterControl;
     }
 
+    // Visible for testing
+    FeatureControlManager featureControl() {
+        return featureControl;
+    }
+
+    // Visible for testing
+    ConfigurationControlManager configurationControl() {
+        return configurationControl;
+    }
+
     <T> CompletableFuture<T> appendReadEvent(
         String name,
         OptionalLong deadlineNs,
@@ -832,13 +844,6 @@ public final class QuorumController implements Controller {
         }
     }
 
-    private <T> CompletableFuture<T> prependWriteEvent(String name,
-                                                       ControllerWriteOperation<T> op) {
-        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
-        queue.prepend(event);
-        return event.future();
-    }
-
     <T> CompletableFuture<T> appendWriteEvent(String name,
                                               OptionalLong deadlineNs,
                                               ControllerWriteOperation<T> op) {
@@ -858,13 +863,14 @@ public final class QuorumController implements Controller {
                 try {
                     maybeCompleteAuthorizerInitialLoad();
                     long processedRecordsSize = 0;
+                    boolean isActive = isActiveController();
                     while (reader.hasNext()) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         int epoch = batch.epoch();
                         List<ApiMessageAndVersion> messages = batch.records();
 
-                        if (isActiveController()) {
+                        if (isActive) {
                             // If the controller is active, the records were already replayed,
                             // so we don't need to do it here.
                             log.debug("Completing purgatory items up to offset {} and epoch {}.", offset, epoch);
@@ -928,7 +934,7 @@ public final class QuorumController implements Controller {
                     log.info("Starting to replay snapshot ({}), from last commit offset ({}) and epoch ({})",
                         reader.snapshotId(), lastCommittedOffset, lastCommittedEpoch);
 
-                    resetState();
+                    resetToEmptyState();
 
                     while (reader.hasNext()) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
@@ -967,6 +973,7 @@ public final class QuorumController implements Controller {
                         reader.lastContainedLogTimestamp()
                     );
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+                    newBytesSinceLastSnapshot = 0L;
                     authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
                 } finally {
                     reader.close();
@@ -976,97 +983,31 @@ public final class QuorumController implements Controller {
 
         @Override
         public void handleLeaderChange(LeaderAndEpoch newLeader) {
-            if (newLeader.isLeader(nodeId)) {
-                final int newEpoch = newLeader.epoch();
-                appendRaftEvent("handleLeaderChange[" + newEpoch + "]", () -> {
-                    int curEpoch = curClaimEpoch;
-                    if (curEpoch != -1) {
-                        throw new RuntimeException("Tried to claim controller epoch " +
-                            newEpoch + ", but we never renounced controller epoch " +
-                            curEpoch);
-                    }
-
-                    curClaimEpoch = newEpoch;
-                    controllerMetrics.setActive(true);
-                    updateWriteOffset(lastCommittedOffset);
-                    clusterControl.activate();
-
-                    // Check if we need to bootstrap metadata into the log. This must happen before we can
-                    // write any other records to the log since we need the metadata.version to determine the correct
-                    // record version
-                    final MetadataVersion metadataVersion;
-                    if (!featureControl.sawMetadataVersion()) {
-                        final CompletableFuture<Map<String, ApiError>> future;
-                        if (!bootstrapMetadata.metadataVersion().isKRaftSupported()) {
-                            metadataVersion = MetadataVersion.MINIMUM_KRAFT_VERSION;
-                            future = new CompletableFuture<>();
-                            future.completeExceptionally(
-                                new IllegalStateException("Cannot become leader without a KRaft supported version. " +
-                                    "Got " + bootstrapMetadata.metadataVersion()));
-                        } else {
-                            metadataVersion = bootstrapMetadata.metadataVersion();
-
-                            // This call is here instead of inside the appendWriteEvent for testing purposes.
-                            final List<ApiMessageAndVersion> bootstrapRecords = bootstrapMetadata.records();
-
-                            // We prepend the bootstrap event in order to ensure the bootstrap metadata is written before
-                            // any external controller write events are processed.
-                            future = prependWriteEvent("bootstrapMetadata", () -> {
-                                if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
-                                    log.info("Initializing metadata.version to {}", metadataVersion.featureLevel());
-                                } else {
-                                    log.info("Upgrading KRaft cluster and initializing metadata.version to {}",
-                                        metadataVersion.featureLevel());
-                                }
-                                return ControllerResult.atomicOf(bootstrapRecords, null);
-                            });
-                        }
-                        future.whenComplete((result, exception) -> {
-                            if (exception != null) {
-                                log.error("Failed to bootstrap metadata.", exception);
-                                appendRaftEvent("bootstrapMetadata[" + curClaimEpoch + "]", () -> {
-                                    if (isActiveController()) {
-                                        log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " +
-                                                        "metadata. Reverting to last committed offset {}.",
-                                                curClaimEpoch, lastCommittedOffset);
-                                        renounce();
-                                    } else {
-                                        log.warn("Unable to bootstrap metadata on standby controller.");
-                                    }
-                                });
-                            }
-                        });
+            appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", () -> {
+                final String newLeaderName = newLeader.leaderId().isPresent() ?
+                        String.valueOf(newLeader.leaderId().getAsInt()) : "(none)";
+                if (isActiveController()) {
+                    if (newLeader.isLeader(nodeId)) {
+                        log.warn("We were the leader in epoch {}, and are still the leader " +
+                                "in the new epoch {}.", curClaimEpoch, newLeader.epoch());
+                        curClaimEpoch = newLeader.epoch();
                     } else {
-                        metadataVersion = featureControl.metadataVersion();
-                    }
-
-                    log.info(
-                        "Becoming the active controller at epoch {}, committed offset {}, committed epoch {}, and metadata.version {}",
-                        newEpoch, lastCommittedOffset, lastCommittedEpoch, metadataVersion.featureLevel()
-                    );
-
-                    // Before switching to active, create an in-memory snapshot at the last committed offset. This is
-                    // required because the active controller assumes that there is always an in-memory snapshot at the
-                    // last committed offset.
-                    snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-
-                    // When becoming the active controller, schedule a leader rebalance if there are any topic partition
-                    // with leader that is not the preferred leader.
-                    maybeScheduleNextBalancePartitionLeaders();
-
-                    // When becoming leader schedule periodic write of the no op record
-                    maybeScheduleNextWriteNoOpRecord();
-                });
-            } else if (isActiveController()) {
-                appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
-                    if (isActiveController()) {
-                        log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
-                                "log event. Reverting to last committed offset {}.", curClaimEpoch,
-                                lastCommittedOffset);
+                        log.warn("Renouncing the leadership due to a metadata log event. " +
+                            "We were the leader at epoch {}, but in the new epoch {}, " +
+                            "the leader is {}. Reverting to last committed offset {}.",
+                            curClaimEpoch, newLeader.epoch(), newLeaderName, lastCommittedOffset);
                         renounce();
                     }
-                });
-            }
+                } else if (newLeader.isLeader(nodeId)) {
+                    log.info("Becoming the active controller at epoch {}, committed offset {}, " +
+                        "committed epoch {}", newLeader.epoch(), lastCommittedOffset,
+                        lastCommittedEpoch);
+                    claim(newLeader.epoch());
+                } else {
+                    log.info("In the new epoch {}, the leader is {}.",
+                        newLeader.epoch(), newLeaderName);
+                }
+            });
         }
 
         @Override
@@ -1128,6 +1069,62 @@ public final class QuorumController implements Controller {
         }
     }
 
+    private void claim(int epoch) {
+        try {
+            if (curClaimEpoch != -1) {
+                throw new RuntimeException("Cannot claim leadership because we are already the " +
+                        "active controller.");
+            }
+            curClaimEpoch = epoch;
+            controllerMetrics.setActive(true);
+            updateWriteOffset(lastCommittedOffset);
+            clusterControl.activate();
+
+            // Before switching to active, create an in-memory snapshot at the last committed
+            // offset. This is required because the active controller assumes that there is always
+            // an in-memory snapshot at the last committed offset.
+            snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+
+            // Prepend the activate event. It is important that this event go at the beginning
+            // of the queue rather than the end (hence prepend rather than append). It's also
+            // important not to use prepend for anything else, to preserve the ordering here.
+            queue.prepend(new ControllerWriteEvent<>("completeActivation[" + epoch + "]",
+                    new CompleteActivationEvent()));
+        } catch (Throwable e) {
+            fatalFaultHandler.handleFault("exception while claiming leadership", e);
+        }
+    }
+
+    class CompleteActivationEvent implements ControllerWriteOperation<Void> {
+        @Override
+        public ControllerResult<Void> generateRecordsAndResult() throws Exception {
+            List<ApiMessageAndVersion> records = new ArrayList<>();
+            if (logReplayTracker.empty()) {
+                // If no records have been replayed, we need to write out the bootstrap records.
+                // This will include the new metadata.version, as well as things like SCRAM
+                // initialization, etc.
+                log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " +
+                        "at metadata.version {} from {}.", bootstrapMetadata.records().size(),
+                        bootstrapMetadata.metadataVersion(), bootstrapMetadata.source());
+                records.addAll(bootstrapMetadata.records());
+            } else if (featureControl.metadataVersion().equals(MetadataVersion.MINIMUM_KRAFT_VERSION)) {
+                log.info("No metadata.version feature level record was found in the log. " +
+                        "Treating the log as version {}.", MetadataVersion.MINIMUM_KRAFT_VERSION);
+            }
+            return ControllerResult.atomicOf(records, null);
+        }
+
+        @Override
+        public void processBatchEndOffset(long offset) {
+            // As part of completing our transition to active controller, we reschedule the
+            // periodic tasks here. At this point, all the records we generated in
+            // generateRecordsAndResult have been applied, so we have the correct value for
+            // metadata.version and other in-memory state.
+            maybeScheduleNextBalancePartitionLeaders();
+            maybeScheduleNextWriteNoOpRecord();
+        }
+    }
+
     private void updateLastCommittedState(long offset, int epoch, long timestamp) {
         lastCommittedOffset = offset;
         lastCommittedEpoch = epoch;
@@ -1152,15 +1149,19 @@ public final class QuorumController implements Controller {
             purgatory.failAll(newNotControllerException());
 
             if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-                newBytesSinceLastSnapshot = 0;
                 snapshotRegistry.revertToSnapshot(lastCommittedOffset);
                 authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
             } else {
-                resetState();
+                log.info("Unable to find last committed offset {} in snapshot registry; resetting " +
+                         "to empty state.", lastCommittedOffset);
+                resetToEmptyState();
+                authorizer.ifPresent(a -> a.loadSnapshot(Collections.emptyMap()));
+                needToCompleteAuthorizerLoad = authorizer.isPresent();
                 raftClient.unregister(metaLogListener);
                 metaLogListener = new QuorumMetaLogListener();
                 raftClient.register(metaLogListener);
             }
+            newBytesSinceLastSnapshot = 0L;
             updateWriteOffset(-1);
             clusterControl.deactivate();
             cancelMaybeFenceReplicas();
@@ -1325,6 +1326,7 @@ public final class QuorumController implements Controller {
      *                          if this record is from a snapshot, this is used along with RegisterBrokerRecord
      */
     private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
+        logReplayTracker.replay(message);
         MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
         switch (type) {
             case REGISTER_BROKER_RECORD:
@@ -1404,7 +1406,7 @@ public final class QuorumController implements Controller {
     /**
      * Clear all data structures and reset all KRaft state.
      */
-    private void resetState() {
+    private void resetToEmptyState() {
         snapshotGeneratorManager.cancel();
         snapshotRegistry.reset();
 
@@ -1522,6 +1524,12 @@ public final class QuorumController implements Controller {
      */
     private final AclControlManager aclControlManager;
 
+    /**
+     * Tracks replaying the log.
+     * This must be accessed only by the event queue thread.
+     */
+    private final LogReplayTracker logReplayTracker;
+
     /**
      * Manages generating controller snapshots.
      */
@@ -1612,6 +1620,9 @@ public final class QuorumController implements Controller {
      */
     private boolean noOpRecordScheduled = false;
 
+    /**
+     * The bootstrap metadata to use for initialization if needed.
+     */
     private final BootstrapMetadata bootstrapMetadata;
 
     private QuorumController(
@@ -1667,6 +1678,12 @@ public final class QuorumController implements Controller {
             setLogContext(logContext).
             setQuorumFeatures(quorumFeatures).
             setSnapshotRegistry(snapshotRegistry).
+            // Set the default metadata version to the minimum KRaft version. This only really
+            // matters if we are upgrading from a version that didn't store metadata.version in
+            // the log, such as one of the pre-production 3.0, 3.1, or 3.2 versions. Those versions
+            // are all treated as 3.0IV1. In newer versions the metadata.version will be specified
+            // by the log.
+            setMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION).
             build();
         this.clusterControl = new ClusterControlManager.Builder().
             setLogContext(logContext).
@@ -1697,6 +1714,9 @@ public final class QuorumController implements Controller {
         this.authorizer = authorizer;
         authorizer.ifPresent(a -> a.setAclMutator(this));
         this.aclControlManager = new AclControlManager(snapshotRegistry, authorizer);
+        this.logReplayTracker = new LogReplayTracker.Builder().
+                setLogContext(logContext).
+                build();
         this.raftClient = raftClient;
         this.bootstrapMetadata = bootstrapMetadata;
         this.metaLogListener = new QuorumMetaLogListener();
@@ -1704,7 +1724,7 @@ public final class QuorumController implements Controller {
         this.needToCompleteAuthorizerLoad = authorizer.isPresent();
         updateWriteOffset(-1);
 
-        resetState();
+        resetToEmptyState();
 
         log.info("Creating new QuorumController with clusterId {}, authorizer {}.", clusterId, authorizer);
 
@@ -1892,7 +1912,12 @@ public final class QuorumController implements Controller {
 
                 @Override
                 public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
-                    OptionalLong offsetForRegisterBrokerRecord = clusterControl.registerBrokerRecordOffset(brokerId);
+                    // Get the offset of the broker registration. Note: although the offset
+                    // we get back here could be the offset for a previous epoch of the
+                    // broker registration, we will check the broker epoch in
+                    // processBrokerHeartbeat, which covers that case.
+                    OptionalLong offsetForRegisterBrokerRecord =
+                            clusterControl.registerBrokerRecordOffset(brokerId);
                     if (!offsetForRegisterBrokerRecord.isPresent()) {
                         throw new StaleBrokerEpochException(
                             String.format("Receive a heartbeat from broker %d before registration", brokerId));
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
index 4cfb1260f1d..75e591f3104 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
@@ -67,10 +67,12 @@ public final class FeaturesImage {
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
         List<ApiMessageAndVersion> batch = new ArrayList<>();
-        // Write out the metadata.version record first, and then the rest of the finalized features
-        batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-            setName(MetadataVersion.FEATURE_NAME).
-            setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+        if (!metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
+            // Write out the metadata.version record first, and then the rest of the finalized features
+            batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+                    setName(MetadataVersion.FEATURE_NAME).
+                    setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+        }
 
         for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
             if (entry.getKey().equals(MetadataVersion.FEATURE_NAME)) {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java
new file mode 100644
index 00000000000..0da74a77d06
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import org.apache.kafka.metadata.util.BatchFileReader;
+import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
+import org.apache.kafka.metadata.util.BatchFileWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;
+
+
+/**
+ * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.checkpoint" is used and the
+ * format is the same as a KRaft snapshot.
+ */
+public class BootstrapDirectory {
+    public static final String BINARY_BOOTSTRAP_FILENAME = "bootstrap.checkpoint";
+
+    private final String directoryPath;
+    private final Optional<String> ibp;
+
+    /**
+     * Create a new BootstrapDirectory object.
+     *
+     * @param directoryPath     The path to the directory with the bootstrap file.
+     * @param ibp               The configured value of inter.broker.protocol, or the empty string
+     *                          if it is not configured.
+     */
+    public BootstrapDirectory(
+        String directoryPath,
+        Optional<String> ibp
+    ) {
+        this.directoryPath = Objects.requireNonNull(directoryPath);
+        this.ibp = Objects.requireNonNull(ibp);
+    }
+
+    public BootstrapMetadata read() throws Exception {
+        if (!Files.isDirectory(Paths.get(directoryPath))) {
+            if (Files.exists(Paths.get(directoryPath))) {
+                throw new RuntimeException("Path " + directoryPath + " exists, but is not " +
+                        "a directory.");
+            } else {
+                throw new RuntimeException("No such directory as " + directoryPath);
+            }
+        }
+        Path binaryBootstrapPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME);
+        if (!Files.exists(binaryBootstrapPath)) {
+            return readFromConfiguration();
+        } else {
+            return readFromBinaryFile(binaryBootstrapPath.toString());
+        }
+    }
+
+    BootstrapMetadata readFromConfiguration() {
+        if (!ibp.isPresent()) {
+            return BootstrapMetadata.fromVersion(MetadataVersion.latest(), "the default bootstrap");
+        }
+        MetadataVersion version = MetadataVersion.fromVersionString(ibp.get());
+        if (version.isLessThan(MINIMUM_BOOTSTRAP_VERSION)) {
+            return BootstrapMetadata.fromVersion(MINIMUM_BOOTSTRAP_VERSION,
+                "the minimum version bootstrap with metadata.version " + MINIMUM_BOOTSTRAP_VERSION);
+        }
+        return BootstrapMetadata.fromVersion(version,
+            "the configured bootstrap with metadata.version " + version);
+    }
+
+    BootstrapMetadata readFromBinaryFile(String binaryPath) throws Exception {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        try (BatchFileReader reader = new BatchFileReader.Builder().
+                setPath(binaryPath).build()) {
+            while (reader.hasNext()) {
+                BatchAndType batchAndType = reader.next();
+                if (!batchAndType.isControl()) {
+                    records.addAll(batchAndType.batch().records());
+                }
+            }
+        }
+        return BootstrapMetadata.fromRecords(Collections.unmodifiableList(records),
+                "the binary bootstrap metadata file: " + binaryPath);
+    }
+
+    public void writeBinaryFile(BootstrapMetadata bootstrapMetadata) throws Exception {
+        if (!Files.isDirectory(Paths.get(directoryPath))) {
+            throw new RuntimeException("No such directory as " + directoryPath);
+        }
+        Path tempPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME + ".tmp");
+        Files.deleteIfExists(tempPath);
+        try (BatchFileWriter writer = BatchFileWriter.open(tempPath)) {
+            for (ApiMessageAndVersion message : bootstrapMetadata.records()) {
+                writer.append(message);
+            }
+        }
+        Files.move(tempPath, Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME),
+                ATOMIC_MOVE, REPLACE_EXISTING);
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
new file mode 100644
index 00000000000..53c6de27bc6
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;
+
+
+/**
+ * The bootstrap metadata. On startup, if the metadata log is empty, we will populate the log with
+ * these records. Alternately, if log is not empty, but the metadata version is not set, we will
+ * use the version specified here.
+ */
+public class BootstrapMetadata {
+    private final List<ApiMessageAndVersion> records;
+    private final MetadataVersion metadataVersion;
+    private final String source;
+
+    public static BootstrapMetadata fromVersion(MetadataVersion metadataVersion, String source) {
+        List<ApiMessageAndVersion> records = Collections.singletonList(
+            new ApiMessageAndVersion(new FeatureLevelRecord().
+                setName(MetadataVersion.FEATURE_NAME).
+                setFeatureLevel(metadataVersion.featureLevel()), (short) 0));
+        return new BootstrapMetadata(records, metadataVersion, source);
+    }
+
+    public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion> records, String source) {
+        MetadataVersion metadataVersion = null;
+        for (ApiMessageAndVersion record : records) {
+            Optional<MetadataVersion> version = recordToMetadataVersion(record.message());
+            if (version.isPresent()) {
+                metadataVersion = version.get();
+            }
+        }
+        if (metadataVersion == null) {
+            throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME +
+                    " was found in the bootstrap metadata from " + source);
+        }
+        return new BootstrapMetadata(records, metadataVersion, source);
+    }
+
+    public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage record) {
+        if (record instanceof FeatureLevelRecord) {
+            FeatureLevelRecord featureLevel = (FeatureLevelRecord) record;
+            if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) {
+                return Optional.of(MetadataVersion.fromFeatureLevel(featureLevel.featureLevel()));
+            }
+        }
+        return Optional.empty();
+    }
+
+    BootstrapMetadata(
+        List<ApiMessageAndVersion> records,
+        MetadataVersion metadataVersion,
+        String source
+    ) {
+        this.records = Objects.requireNonNull(records);
+        if (metadataVersion.isLessThan(MINIMUM_BOOTSTRAP_VERSION)) {
+            throw new RuntimeException("Bootstrap metadata versions before " +
+                    MINIMUM_BOOTSTRAP_VERSION + " are not supported. Can't load metadata from " +
+                    source);
+        }
+        this.metadataVersion = metadataVersion;
+        Objects.requireNonNull(source);
+        this.source = source;
+    }
+
+    public List<ApiMessageAndVersion> records() {
+        return records;
+    }
+
+    public MetadataVersion metadataVersion() {
+        return metadataVersion;
+    }
+
+    public String source() {
+        return source;
+    }
+
+    public BootstrapMetadata copyWithOnlyVersion() {
+        ApiMessageAndVersion versionRecord = null;
+        for (ApiMessageAndVersion record : records) {
+            if (recordToMetadataVersion(record.message()).isPresent()) {
+                versionRecord = record;
+            }
+        }
+        if (versionRecord == null) {
+            throw new RuntimeException("No FeatureLevelRecord for " + MetadataVersion.FEATURE_NAME +
+                    " was found in " + source);
+        }
+        return new BootstrapMetadata(Collections.singletonList(versionRecord),
+                metadataVersion, source);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(records, metadataVersion, source);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !o.getClass().equals(this.getClass())) return false;
+        BootstrapMetadata other = (BootstrapMetadata) o;
+        return Objects.equals(records, other.records) &&
+            metadataVersion.equals(other.metadataVersion) &&
+            source.equals(other.source);
+    }
+
+    @Override
+    public String toString() {
+        return "BootstrapMetadata(records=" + records.toString() +
+            ", metadataVersion=" + metadataVersion +
+            ", source=" + source +
+            ")";
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
new file mode 100644
index 00000000000..e82d37cf78d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.util;
+
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.MetadataRecordSerde;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+
+/**
+ * Reads a log file containing KRaft record batches.
+ */
+public final class BatchFileReader implements Iterator<BatchFileReader.BatchAndType>, AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(BatchFileReader.class);
+
+    public static class Builder {
+        private String path = null;
+
+        public Builder setPath(String path) {
+            this.path = Objects.requireNonNull(path);
+            return this;
+        }
+
+        public BatchFileReader build() throws Exception {
+            if (path == null) {
+                throw new RuntimeException("You must specify a path.");
+            }
+            FileRecords fileRecords = FileRecords.open(new File(path), false);
+            try {
+                return new BatchFileReader(fileRecords);
+            } catch (Throwable e) {
+                Utils.closeQuietly(fileRecords, "fileRecords");
+                throw e;
+            }
+        }
+    }
+
+    public static class BatchAndType {
+        private final Batch<ApiMessageAndVersion> batch;
+        private final boolean isControl;
+
+        public BatchAndType(Batch<ApiMessageAndVersion> batch, boolean isControl) {
+            this.batch = batch;
+            this.isControl = isControl;
+        }
+
+        public Batch<ApiMessageAndVersion> batch() {
+            return batch;
+        }
+
+        public boolean isControl() {
+            return isControl;
+        }
+    }
+
+    private final FileRecords fileRecords;
+    private Iterator<FileChannelRecordBatch> batchIterator;
+    private final MetadataRecordSerde serde;
+
+    private BatchFileReader(FileRecords fileRecords) {
+        this.fileRecords = fileRecords;
+        this.batchIterator = fileRecords.batchIterator();
+        this.serde = new MetadataRecordSerde();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return this.batchIterator.hasNext();
+    }
+
+    @Override
+    public BatchAndType next() {
+        FileChannelRecordBatch input = batchIterator.next();
+        if (input.isControlBatch()) {
+            return nextControlBatch(input);
+        } else {
+            return nextDataBatch(input);
+        }
+    }
+
+    private BatchAndType nextControlBatch(FileChannelRecordBatch input) {
+        List<ApiMessageAndVersion> messages = new ArrayList<>();
+        for (Iterator<Record> iter = input.iterator(); iter.hasNext(); ) {
+            Record record = iter.next();
+            try {
+                short typeId = ControlRecordType.parseTypeId(record.key());
+                ControlRecordType type = ControlRecordType.fromTypeId(typeId);
+                switch (type) {
+                    case LEADER_CHANGE: {
+                        LeaderChangeMessage message = new LeaderChangeMessage();
+                        message.read(new ByteBufferAccessor(record.value()), (short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 0));
+                        break;
+                    }
+                    case SNAPSHOT_HEADER: {
+                        SnapshotHeaderRecord message = new SnapshotHeaderRecord();
+                        message.read(new ByteBufferAccessor(record.value()), (short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 0));
+                        break;
+                    }
+                    case SNAPSHOT_FOOTER: {
+                        SnapshotFooterRecord message = new SnapshotFooterRecord();
+                        message.read(new ByteBufferAccessor(record.value()), (short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 0));
+                        break;
+                    }
+                    default:
+                        throw new RuntimeException("Unsupported control record type " + type + " at offset " +
+                                record.offset());
+                }
+            } catch (Throwable e) {
+                throw new RuntimeException("Unable to read control record at offset " + record.offset(), e);
+            }
+        }
+        return new BatchAndType(Batch.data(
+            input.baseOffset(),
+            input.partitionLeaderEpoch(),
+            input.maxTimestamp(),
+            input.sizeInBytes(),
+            messages), true);
+    }
+
+    private BatchAndType nextDataBatch(FileChannelRecordBatch input) {
+        List<ApiMessageAndVersion> messages = new ArrayList<>();
+        for (Record record : input) {
+            try {
+                ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
+                ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize());
+                messages.add(messageAndVersion);
+            } catch (Throwable e) {
+                throw new RuntimeException("unable to deserialize record at offset " + record.offset(), e);
+            }
+        }
+        return new BatchAndType(Batch.data(
+            input.baseOffset(),
+            input.partitionLeaderEpoch(),
+            input.maxTimestamp(),
+            input.sizeInBytes(),
+            messages), false);
+    }
+
+    @Override
+    public void close() {
+        try {
+            fileRecords.closeHandlers();
+        } catch (Exception e) {
+            log.error("Error closing fileRecords", e);
+        }
+        this.batchIterator = Collections.<FileChannelRecordBatch>emptyList().iterator();
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
similarity index 85%
rename from metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileWriter.java
rename to metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
index 5608bdc464a..16b8c541b74 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
@@ -36,14 +36,14 @@ import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
 
 
 /**
- * Write an arbitrary set of metadata records into a Kafka metadata snapshot format. The resulting snapshot will be use
- * epoch of zero and an initial offset of zero. This class should not be used for creating actual metadata snapshots.
+ * Write an arbitrary set of metadata records into a Kafka metadata log batch format. This is similar to the binary
+ * format used for metadata snapshot files, but the log epoch and initial offset are set to zero.
  */
-public class SnapshotFileWriter implements AutoCloseable {
+public class BatchFileWriter implements AutoCloseable {
     private final FileChannel channel;
     private final BatchAccumulator<ApiMessageAndVersion> batchAccumulator;
 
-    SnapshotFileWriter(FileChannel channel, BatchAccumulator<ApiMessageAndVersion> batchAccumulator) {
+    BatchFileWriter(FileChannel channel, BatchAccumulator<ApiMessageAndVersion> batchAccumulator) {
         this.channel = channel;
         this.batchAccumulator = batchAccumulator;
     }
@@ -63,7 +63,7 @@ public class SnapshotFileWriter implements AutoCloseable {
         channel.close();
     }
 
-    public static SnapshotFileWriter open(Path snapshotPath) throws IOException {
+    public static BatchFileWriter open(Path snapshotPath) throws IOException {
         BatchAccumulator<ApiMessageAndVersion> batchAccumulator = new BatchAccumulator<>(
             0,
             0,
@@ -76,6 +76,6 @@ public class SnapshotFileWriter implements AutoCloseable {
 
         FileChannel channel = FileChannel.open(snapshotPath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
 
-        return new SnapshotFileWriter(channel, batchAccumulator);
+        return new BatchFileWriter(channel, batchAccumulator);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 997d49acb1b..49d62fd2117 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -168,7 +168,7 @@ public class SnapshotRegistry {
     public Snapshot getSnapshot(long epoch) {
         Snapshot snapshot = snapshots.get(epoch);
         if (snapshot == null) {
-            throw new RuntimeException("No snapshot for epoch " + epoch + ". Snapshot " +
+            throw new RuntimeException("No in-memory snapshot for epoch " + epoch + ". Snapshot " +
                 "epochs are: " + epochsList().stream().map(e -> e.toString()).
                     collect(Collectors.joining(", ")));
         }
@@ -186,7 +186,7 @@ public class SnapshotRegistry {
     public Snapshot getOrCreateSnapshot(long epoch) {
         Snapshot last = head.prev();
         if (last.epoch() > epoch) {
-            throw new RuntimeException("Can't create a new snapshot at epoch " + epoch +
+            throw new RuntimeException("Can't create a new in-memory snapshot at epoch " + epoch +
                 " because there is already a snapshot with epoch " + last.epoch());
         } else if (last.epoch() == epoch) {
             return last;
@@ -194,7 +194,7 @@ public class SnapshotRegistry {
         Snapshot snapshot = new Snapshot(epoch);
         last.appendNext(snapshot);
         snapshots.put(epoch, snapshot);
-        log.debug("Creating snapshot {}", epoch);
+        log.debug("Creating in-memory snapshot {}", epoch);
         return snapshot;
     }
 
@@ -209,7 +209,7 @@ public class SnapshotRegistry {
         iterator.next();
         while (iterator.hasNext()) {
             Snapshot snapshot = iterator.next();
-            log.debug("Deleting snapshot {} because we are reverting to {}",
+            log.debug("Deleting in-memory snapshot {} because we are reverting to {}",
                 snapshot.epoch(), targetEpoch);
             iterator.remove();
         }
@@ -252,7 +252,6 @@ public class SnapshotRegistry {
             if (snapshot.epoch() >= targetEpoch) {
                 return;
             }
-            log.debug("Deleting snapshot {}", snapshot.epoch());
             iterator.remove();
         }
     }
diff --git a/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json b/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
index 29cdd9b467f..b5e561512b8 100644
--- a/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
+++ b/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
@@ -14,6 +14,9 @@
 // limitations under the License.
 
 {
+  // Note: New metadata logs and snapshots begin with a FeatureLevelRecord which specifies the
+  // metadata level that is required to read them. The version of that record cannot advance
+  // beyond 0, for backwards compatibility reasons.
   "apiKey": 12,
   "type": "metadata",
   "name": "FeatureLevelRecord",
diff --git a/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java b/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
deleted file mode 100644
index f1577269196..00000000000
--- a/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.Random;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class BootstrapMetadataTest {
-    private Path tmpDir;
-
-    @BeforeEach
-    public void createTestDir() {
-        tmpDir = TestUtils.tempDirectory("BootstrapMetadataTest").toPath();
-    }
-
-    @AfterEach
-    public void deleteTestDir() throws IOException {
-        if (tmpDir != null)
-            Utils.delete(tmpDir.toFile());
-    }
-
-    @Test
-    public void testWriteAndReadBootstrapFile() throws Exception {
-        BootstrapMetadata metadata = BootstrapMetadata.create(MetadataVersion.MINIMUM_KRAFT_VERSION);
-        BootstrapMetadata.write(metadata, tmpDir);
-
-        assertTrue(Files.exists(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE)));
-
-        BootstrapMetadata newMetadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION);
-        assertEquals(metadata, newMetadata);
-    }
-
-    @Test
-    public void testNoBootstrapFile() throws Exception {
-        BootstrapMetadata metadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION);
-        assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, metadata.metadataVersion());
-        metadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.IBP_3_2_IV0);
-        assertEquals(MetadataVersion.IBP_3_2_IV0, metadata.metadataVersion());
-    }
-
-    @Test
-    public void testExistingBootstrapFile() throws Exception {
-        BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.MINIMUM_KRAFT_VERSION), tmpDir);
-        assertThrows(IOException.class, () -> {
-            BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_1_IV0), tmpDir);
-        });
-    }
-
-    @Test
-    public void testEmptyBootstrapFile() throws Exception {
-        Files.createFile(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE));
-        assertThrows(Exception.class, () -> BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION),
-            "Should fail to load if no metadata.version is set");
-    }
-
-    @Test
-    public void testGarbageBootstrapFile() throws Exception {
-        Files.createFile(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE));
-        Random random = new Random(1);
-        byte[] data = new byte[100];
-        random.nextBytes(data);
-        Files.write(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE), data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
-        assertThrows(Exception.class, () -> BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION),
-            "Should fail on invalid data");
-    }
-}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index e47def81e6d..ceb12cc6254 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -17,18 +17,12 @@
 
 package org.apache.kafka.controller;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Optional;
-
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.BrokerRegistrationRequestData;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
@@ -46,6 +40,7 @@ import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.metadata.placement.ClusterDescriber;
 import org.apache.kafka.metadata.placement.PlacementSpec;
 import org.apache.kafka.metadata.placement.UsableBroker;
@@ -58,6 +53,13 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -310,6 +312,11 @@ public class ClusterControlManagerTest {
                 setRack(null).
                 setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")).
                 setFenced(true).
+                setFeatures(new RegisterBrokerRecord.BrokerFeatureCollection(Arrays.asList(
+                    new RegisterBrokerRecord.BrokerFeature().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setMinSupportedVersion((short) 1).
+                        setMaxSupportedVersion((short) 1)).iterator())).
                 setInControlledShutdown(false), expectedVersion)),
             result.records());
     }
@@ -488,4 +495,56 @@ public class ClusterControlManagerTest {
                 setFenced(true), expectedVersion))),
                 clusterControl.iterator(Long.MAX_VALUE));
     }
+
+
+    @Test
+    public void testRegistrationWithUnsupportedMetadataVersion() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControl = new FeatureControlManager.Builder().
+                setSnapshotRegistry(snapshotRegistry).
+                setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                        Collections.singletonMap(MetadataVersion.FEATURE_NAME, VersionRange.of(
+                                MetadataVersion.IBP_3_1_IV0.featureLevel(),
+                                MetadataVersion.IBP_3_3_IV0.featureLevel())),
+                        Collections.singletonList(0))).
+                setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
+                build();
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+                setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
+                setTime(new MockTime(0, 0, 0)).
+                setSnapshotRegistry(snapshotRegistry).
+                setControllerMetrics(new MockControllerMetrics()).
+                setFeatureControlManager(featureControl).
+                build();
+        clusterControl.activate();
+
+        assertEquals("Unable to register because the broker does not support version 4 of " +
+            "metadata.version. It wants a version between 1 and 1, inclusive.",
+            assertThrows(UnsupportedVersionException.class,
+                () -> clusterControl.registerBroker(
+                    new BrokerRegistrationRequestData().
+                        setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
+                        setBrokerId(0).
+                        setRack(null).
+                        setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+                    123L,
+                    featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
+
+        assertEquals("Unable to register because the broker does not support version 4 of " +
+            "metadata.version. It wants a version between 7 and 7, inclusive.",
+            assertThrows(UnsupportedVersionException.class,
+                () -> clusterControl.registerBroker(
+                    new BrokerRegistrationRequestData().
+                        setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
+                        setBrokerId(0).
+                        setRack(null).
+                        setFeatures(new BrokerRegistrationRequestData.FeatureCollection(
+                                Collections.singleton(new BrokerRegistrationRequestData.Feature().
+                                    setName(MetadataVersion.FEATURE_NAME).
+                                    setMinSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel()).
+                                    setMaxSupportedVersion(MetadataVersion.IBP_3_3_IV3.featureLevel())).iterator())).
+                        setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+                    123L,
+                    featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage());
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 4d4c4719945..5162d879a53 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -41,6 +41,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 
@@ -91,9 +93,10 @@ public class FeatureControlManagerTest {
         FeatureControlManager manager = new FeatureControlManager.Builder().
             setQuorumFeatures(features("foo", 1, 2)).
             setSnapshotRegistry(snapshotRegistry).
+            setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
             build();
         snapshotRegistry.getOrCreateSnapshot(-1);
-        assertEquals(new FinalizedControllerFeatures(Collections.singletonMap("metadata.version", (short) 1), -1),
+        assertEquals(new FinalizedControllerFeatures(Collections.singletonMap("metadata.version", (short) 4), -1),
             manager.finalizedFeatures(-1));
         assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
@@ -128,10 +131,11 @@ public class FeatureControlManagerTest {
                 setLogContext(logContext).
                 setQuorumFeatures(features("foo", 1, 2)).
                 setSnapshotRegistry(snapshotRegistry).
+                setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
                 build();
         manager.replay(record);
         snapshotRegistry.getOrCreateSnapshot(123);
-        assertEquals(new FinalizedControllerFeatures(versionMap("metadata.version", 1, "foo", 2), 123),
+        assertEquals(new FinalizedControllerFeatures(versionMap("metadata.version", 4, "foo", 2), 123),
             manager.finalizedFeatures(123));
     }
 
@@ -204,6 +208,7 @@ public class FeatureControlManagerTest {
             setLogContext(logContext).
             setQuorumFeatures(features("foo", 1, 5, "bar", 1, 2)).
             setSnapshotRegistry(snapshotRegistry).
+            setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
             build();
         ControllerResult<Map<String, ApiError>> result = manager.
             updateFeatures(updateMap("foo", 5, "bar", 1),
@@ -212,7 +217,7 @@ public class FeatureControlManagerTest {
         RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
             Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
                     setName("metadata.version").
-                    setFeatureLevel((short) 1), (short) 0)),
+                    setFeatureLevel((short) 4), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName("foo").
                 setFeatureLevel((short) 5), (short) 0)),
@@ -222,52 +227,147 @@ public class FeatureControlManagerTest {
             manager.iterator(Long.MAX_VALUE));
     }
 
+    private static final FeatureControlManager.Builder TEST_MANAGER_BUILDER1 =
+            new FeatureControlManager.Builder().
+                    setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
+                            MetadataVersion.IBP_3_3_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())).
+                    setMetadataVersion(MetadataVersion.IBP_3_3_IV2);
+
     @Test
     public void testApplyMetadataVersionChangeRecord() {
-        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
-                MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
-        FeatureControlManager manager = new FeatureControlManager.Builder().
-            setQuorumFeatures(features).build();
+        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
         manager.replay(new FeatureLevelRecord().
             setName(MetadataVersion.FEATURE_NAME).
-            setFeatureLevel(MetadataVersion.IBP_3_0_IV1.featureLevel()));
-        assertEquals(MetadataVersion.IBP_3_0_IV1, manager.metadataVersion());
+            setFeatureLevel(MetadataVersion.IBP_3_3_IV3.featureLevel()));
+        assertEquals(MetadataVersion.IBP_3_3_IV3, manager.metadataVersion());
+    }
+
+    @Test
+    public void testCannotDowngradeToVersionBeforeMinimumSupportedKraftVersion() {
+        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+            singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
+                "Invalid update version 3 for feature metadata.version. Local controller 0 only " +
+                "supports versions 4-7"))),
+            manager.updateFeatures(
+                singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
+                singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+                emptyMap(),
+                true));
+    }
+
+    @Test
+    public void testCannotDowngradeToHigherVersion() {
+        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+            singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
+                "Invalid update version 7 for feature metadata.version. Can't downgrade to a " +
+                "newer version."))),
+            manager.updateFeatures(
+                singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()),
+                singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+                emptyMap(),
+                true));
+    }
+
+    @Test
+    public void testCannotUnsafeDowngradeToHigherVersion() {
+        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+            singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
+                "Invalid update version 7 for feature metadata.version. Can't downgrade to a " +
+                "newer version."))),
+            manager.updateFeatures(
+                singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()),
+                singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+                emptyMap(),
+                true));
+    }
+
+    @Test
+    public void testCannotUpgradeToLowerVersion() {
+        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+            singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
+                "Invalid update version 4 for feature metadata.version. Can't downgrade the " +
+                "version of this feature without setting the upgrade type to either safe or " +
+                "unsafe downgrade."))),
+            manager.updateFeatures(
+                singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
+                singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
+                emptyMap(),
+                true));
+    }
+
+    @Test
+    public void testCanUpgradeToHigherVersion() {
+        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+            singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
+            manager.updateFeatures(
+                singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()),
+                singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
+                emptyMap(),
+                true));
     }
 
     @Test
-    public void testDowngradeMetadataVersion() {
-        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
-                MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
+    public void testCannotUseSafeDowngradeIfMetadataChanged() {
+        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+            singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
+                "Invalid metadata.version 4. Refusing to perform the requested downgrade because " +
+                "it might delete metadata information. Retry using UNSAFE_DOWNGRADE if you want to " +
+                "force the downgrade to proceed."))),
+            manager.updateFeatures(
+                singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
+                singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+                emptyMap(),
+                true));
+    }
+
+    @Test
+    public void testCanUseUnsafeDowngradeIfMetadataChanged() {
+        FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+                        singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
+                manager.updateFeatures(
+                        singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
+                        singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+                        emptyMap(),
+                        true));
+    }
+
+    @Test
+    public void testCanUseSafeDowngradeIfMetadataDidNotChange() {
         FeatureControlManager manager = new FeatureControlManager.Builder().
-            setQuorumFeatures(features).
-            setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
-            build();
-        assertEquals(manager.metadataVersion(), MetadataVersion.IBP_3_3_IV0);
-
-        ControllerResult<Map<String, ApiError>> result;
-        result = manager.updateFeatures(
-            Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
-            Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
-            Collections.emptyMap(),
-            true);
-        assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
-
-
-        result = manager.updateFeatures(
-            Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_1_IV0.featureLevel()),
-            Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
-            Collections.emptyMap(),
-            true);
-        assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
-
-        result = manager.updateFeatures(
-                Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()),
-                Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
-                Collections.emptyMap(),
-                true);
-        assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
-        assertEquals("Invalid update version 1 for feature metadata.version. Local controller 0 only supports versions 3-4",
-            result.response().get(MetadataVersion.FEATURE_NAME).message());
+                setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
+                        MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV1.featureLevel())).
+                setMetadataVersion(MetadataVersion.IBP_3_1_IV0).
+                setMinimumBootstrapVersion(MetadataVersion.IBP_3_0_IV0).build();
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+                        singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
+                manager.updateFeatures(
+                        singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV1.featureLevel()),
+                        singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+                        emptyMap(),
+                        true));
+    }
+
+    @Test
+    public void testCanotDowngradeBefore3_3_IV0() {
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
+                    MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())).
+                    setMetadataVersion(MetadataVersion.IBP_3_3_IV0).build();
+        assertEquals(ControllerResult.of(Collections.emptyList(),
+                        singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
+                        "Invalid metadata.version 3. Unable to set a metadata.version less than 3.3-IV0"))),
+                manager.updateFeatures(
+                        singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
+                        singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+                        emptyMap(),
+                        true));
     }
 
     @Test
diff --git a/metadata/src/test/java/org/apache/kafka/controller/LogReplayTrackerTest.java b/metadata/src/test/java/org/apache/kafka/controller/LogReplayTrackerTest.java
new file mode 100644
index 00000000000..eb0bd0fb70a
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/LogReplayTrackerTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.NoOpRecord;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class LogReplayTrackerTest {
+    @Test
+    public void testEmpty() {
+        LogReplayTracker tracker = new LogReplayTracker.Builder().build();
+        assertTrue(tracker.empty());
+        tracker.replay(new NoOpRecord());
+        assertFalse(tracker.empty());
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index e8392895626..89ef0b083d2 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.kafka.controller;
 
+import java.io.File;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,7 +35,6 @@ import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +48,7 @@ import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.config.ConfigResource;
@@ -84,22 +87,28 @@ import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.util.BatchFileWriter;
 import org.apache.kafka.metalog.LocalLogManager;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.snapshot.FileRawSnapshotReader;
 import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.RawSnapshotReader;
 import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.snapshot.Snapshots;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
-import org.mockito.Mockito;
 
 import static java.util.function.Function.identity;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -115,10 +124,13 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 
 @Timeout(value = 40)
 public class QuorumControllerTest {
+    static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
+            fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided bootstrap");
 
     /**
      * Test creating a new QuorumController and closing it.
@@ -148,7 +160,9 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                setBrokerId(0).setClusterId(logEnv.clusterId())).get();
+                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
+                setBrokerId(0).
+                setClusterId(logEnv.clusterId())).get();
             testConfigurationOperations(controlEnv.activeController());
         }
     }
@@ -184,7 +198,9 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                    setBrokerId(0).setClusterId(logEnv.clusterId())).get();
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
+                    setBrokerId(0).
+                    setClusterId(logEnv.clusterId())).get();
             testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
         }
     }
@@ -216,9 +232,13 @@ public class QuorumControllerTest {
 
         try (
             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty(), BootstrapMetadata.create(MetadataVersion.latest()));
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv,
+                b -> {
+                    b.setConfigSchema(SCHEMA);
+                },
+                OptionalLong.of(sessionTimeoutMillis),
+                OptionalLong.empty(),
+                SIMPLE_BOOTSTRAP);
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -231,6 +251,7 @@ public class QuorumControllerTest {
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
                         setClusterId(active.clusterId()).
+                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                         setIncarnationId(Uuid.randomUuid()).
                         setListeners(listeners));
                 brokerEpochs.put(brokerId, reply.get().epoch());
@@ -308,9 +329,13 @@ public class QuorumControllerTest {
 
         try (
             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs), BootstrapMetadata.create(MetadataVersion.latest()));
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv,
+                b -> {
+                    b.setConfigSchema(SCHEMA);
+                },
+                OptionalLong.of(sessionTimeoutMillis),
+                OptionalLong.of(leaderImbalanceCheckIntervalNs),
+                SIMPLE_BOOTSTRAP);
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -323,6 +348,7 @@ public class QuorumControllerTest {
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
                         setClusterId(active.clusterId()).
+                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                         setIncarnationId(Uuid.randomUuid()).
                         setListeners(listeners));
                 brokerEpochs.put(brokerId, reply.get().epoch());
@@ -383,6 +409,7 @@ public class QuorumControllerTest {
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
                         setClusterId(active.clusterId()).
+                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                         setIncarnationId(Uuid.randomUuid()).
                         setListeners(listeners));
                 brokerEpochs.put(brokerId, reply.get().epoch());
@@ -435,13 +462,10 @@ public class QuorumControllerTest {
         long maxReplicationDelayMs = 60_000;
         try (
             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty());
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(
-                logEnv,
-                builder -> {
-                    builder.setConfigSchema(SCHEMA)
-                        .setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs));
-                }
-            );
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
+                b.setConfigSchema(SCHEMA);
+                b.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs));
+            });
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -493,7 +517,7 @@ public class QuorumControllerTest {
                         setBrokerId(0).
                         setClusterId(active.clusterId()).
                         setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-                        setFeatures(brokerFeatures()).
+                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                         setListeners(listeners));
                 assertEquals(2L, reply.get().epoch());
                 CreateTopicsRequestData createTopicsRequestData =
@@ -551,6 +575,18 @@ public class QuorumControllerTest {
         return features;
     }
 
+    private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(
+        MetadataVersion minVersion,
+        MetadataVersion maxVersion
+    ) {
+        RegisterBrokerRecord.BrokerFeatureCollection features = new RegisterBrokerRecord.BrokerFeatureCollection();
+        features.add(new RegisterBrokerRecord.BrokerFeature().
+                setName(MetadataVersion.FEATURE_NAME).
+                setMinSupportedVersion(minVersion.featureLevel()).
+                setMaxSupportedVersion(maxVersion.featureLevel()));
+        return features;
+    }
+
     @Test
     public void testSnapshotSaveAndLoad() throws Throwable {
         final int numBrokers = 4;
@@ -558,9 +594,13 @@ public class QuorumControllerTest {
         RawSnapshotReader reader = null;
         Uuid fooId;
         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
-            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                b.setConfigSchema(SCHEMA);
-            })) {
+            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(
+                logEnv,
+                b -> b.setConfigSchema(SCHEMA),
+                OptionalLong.empty(),
+                OptionalLong.empty(),
+                SIMPLE_BOOTSTRAP)
+            ) {
                 QuorumController active = controlEnv.activeController();
                 for (int i = 0; i < numBrokers; i++) {
                     BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
@@ -568,6 +608,7 @@ public class QuorumControllerTest {
                             setBrokerId(i).
                             setRack(null).
                             setClusterId(active.clusterId()).
+                            setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                             setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
                             setListeners(new ListenerCollection(Arrays.asList(new Listener().
                                 setName("PLAINTEXT").setHost("localhost").
@@ -630,6 +671,7 @@ public class QuorumControllerTest {
             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
                 b.setConfigSchema(SCHEMA);
                 b.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
+                b.setBootstrapMetadata(SIMPLE_BOOTSTRAP);
             })) {
                 QuorumController active = controlEnv.activeController();
                 for (int i = 0; i < numBrokers; i++) {
@@ -638,6 +680,7 @@ public class QuorumControllerTest {
                             setBrokerId(i).
                             setRack(null).
                             setClusterId(active.clusterId()).
+                            setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                             setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
                             setListeners(new ListenerCollection(Arrays.asList(new Listener().
                                 setName("PLAINTEXT").setHost("localhost").
@@ -666,7 +709,7 @@ public class QuorumControllerTest {
                     Collections.singleton("foo")).get();
                 fooId = fooData.topics().find("foo").topicId();
                 active.allocateProducerIds(ANONYMOUS_CONTEXT,
-                    new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
+                        new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
 
                 SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(logEnv.waitForLatestSnapshot());
                 checkSnapshotSubcontent(
@@ -694,6 +737,7 @@ public class QuorumControllerTest {
                             setBrokerId(i).
                             setRack(null).
                             setClusterId(active.clusterId()).
+                            setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                             setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
                             setListeners(new ListenerCollection(Arrays.asList(new Listener().
                                 setName("PLAINTEXT").setHost("localhost").
@@ -748,7 +792,7 @@ public class QuorumControllerTest {
         return Arrays.asList(
             new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(MetadataVersion.FEATURE_NAME).
-                setFeatureLevel(MetadataVersion.latest().featureLevel()), (short) 0),
+                setFeatureLevel(MetadataVersion.IBP_3_3_IV3.featureLevel()), (short) 0),
             new ApiMessageAndVersion(new TopicRecord().
                 setName("foo").setTopicId(fooId), (short) 0),
             new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
@@ -769,6 +813,7 @@ public class QuorumControllerTest {
                         Arrays.asList(
                             new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                             setPort(9092).setSecurityProtocol((short) 0)).iterator())).
+                setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                 setRack(null).
                 setFenced(false), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
@@ -779,6 +824,7 @@ public class QuorumControllerTest {
                         Arrays.asList(
                             new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                             setPort(9093).setSecurityProtocol((short) 0)).iterator())).
+                setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                 setRack(null).
                 setFenced(false), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
@@ -789,6 +835,7 @@ public class QuorumControllerTest {
                         Arrays.asList(
                             new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                             setPort(9094).setSecurityProtocol((short) 0)).iterator())).
+                setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                 setRack(null).
                 setFenced(false), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
@@ -797,6 +844,7 @@ public class QuorumControllerTest {
                 setEndPoints(new BrokerEndpointCollection(Arrays.asList(
                     new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                         setPort(9095).setSecurityProtocol((short) 0)).iterator())).
+                setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                 setRack(null), (short) 1),
             new ApiMessageAndVersion(new ProducerIdsRecord().
                 setBrokerId(0).
@@ -843,13 +891,13 @@ public class QuorumControllerTest {
             while (expectedIndex < expected.size() && !expected.get(expectedIndex).equals(current)) {
                 expectedIndex += 1;
             }
+
+            if (expectedIndex >= expected.size()) {
+                fail("Failed to find record " + current + " in the expected record set: " + expected);
+            }
+
             expectedIndex += 1;
         }
-
-        assertTrue(
-            expectedIndex <= expected.size(),
-            String.format("actual is not a subset of expected: expected = %s; actual = %s", expected, actual)
-        );
     }
 
     /**
@@ -952,6 +1000,7 @@ public class QuorumControllerTest {
         }
     }
 
+    @Disabled // TODO: need to fix leader election in LocalLog.
     @Test
     public void testMissingInMemorySnapshot() throws Exception {
         int numBrokers = 3;
@@ -1060,6 +1109,7 @@ public class QuorumControllerTest {
                     .setBrokerId(brokerId)
                     .setRack(null)
                     .setClusterId(controller.clusterId())
+                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3))
                     .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
                     .setListeners(
                         new ListenerCollection(
@@ -1214,87 +1264,112 @@ public class QuorumControllerTest {
         }
     }
 
-    @Test
-    public void testInvalidBootstrapMetadata() throws Exception {
-        // We can't actually create a BootstrapMetadata with an invalid version, so we have to mock it
-        BootstrapMetadata bootstrapMetadata = Mockito.mock(BootstrapMetadata.class);
-        CyclicBarrier barrier = new CyclicBarrier(2);
-        Mockito.when(bootstrapMetadata.metadataVersion()).thenAnswer(__ -> {
-            // This barrier allows us to catch the controller after it becomes leader, but before the bootstrapping fails
-            barrier.await(10, TimeUnit.SECONDS);
-            return MetadataVersion.IBP_2_8_IV0;
-        });
-        try (
-                LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
-                QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
-                    b.setConfigSchema(SCHEMA);
-                }, OptionalLong.empty(), OptionalLong.empty(), bootstrapMetadata);
-        ) {
-            QuorumController controller = controlEnv.activeController();
-            assertTrue(controller.isActive());
-            // Unblock the first call to BootstrapMetadata#metadataVersion
-            barrier.await(10, TimeUnit.SECONDS);
-            // Unblock the second call to BootstrapMetadata#metadataVersion
-            barrier.await(10, TimeUnit.SECONDS);
-            TestUtils.waitForCondition(() -> !controller.isActive(),
-                "Timed out waiting for controller to renounce itself after bad bootstrap metadata version.");
+    static class InitialSnapshot implements AutoCloseable {
+        File tempDir = null;
+        BatchFileWriter writer = null;
+
+        public InitialSnapshot(List<ApiMessageAndVersion> records) throws Exception {
+            tempDir = TestUtils.tempDirectory();
+            Path path = Snapshots.snapshotPath(tempDir.toPath(), new OffsetAndEpoch(0, 0));
+            writer = BatchFileWriter.open(path);
+            writer.append(records);
+            writer.close();
+            writer = null;
+        }
+
+        @Override
+        public void close() throws Exception {
+            Utils.closeQuietly(writer, "BatchFileWriter");
+            Utils.delete(tempDir);
         }
     }
 
+    private final static List<ApiMessageAndVersion> PRE_PRODUCTION_RECORDS =
+            Collections.unmodifiableList(Arrays.asList(
+                new ApiMessageAndVersion(new RegisterBrokerRecord().
+                        setBrokerEpoch(42).
+                        setBrokerId(123).
+                        setIncarnationId(Uuid.fromString("v78Gbc6sQXK0y5qqRxiryw")).
+                        setRack(null),
+                        (short) 0),
+                new ApiMessageAndVersion(new UnfenceBrokerRecord().
+                        setEpoch(42).
+                        setId(123),
+                        (short) 0),
+                new ApiMessageAndVersion(new TopicRecord().
+                        setName("bar").
+                        setTopicId(Uuid.fromString("cxBT72dK4si8Ied1iP4wBA")),
+                        (short) 0)));
+
+    private final static BootstrapMetadata COMPLEX_BOOTSTRAP = BootstrapMetadata.fromRecords(
+            Arrays.asList(
+                new ApiMessageAndVersion(new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(MetadataVersion.IBP_3_3_IV1.featureLevel()),
+                        (short) 0),
+                new ApiMessageAndVersion(new ConfigRecord().
+                        setResourceType(BROKER.id()).
+                        setResourceName("").
+                        setName("foo").
+                        setValue("bar"),
+                        (short) 0)),
+            "test bootstrap");
+
     @Test
-    public void testBootstrapMetadataStartupRace() throws Throwable {
-        // KAFKA-13966: This tests a race condition between external RPC calls being handled before the bootstrap
-        // metadata is written. We instrument this by forcing the BootstrapMetadata#records method to block until a
-        // latch has been completed. This allows an asynchronous broker registration call to be handled before the
-        // handleLeaderChange callback completes. In this case, the registration should fail because the bootstrap
-        // metadata includes an unsupported metadata.version.
-        BootstrapMetadata bootstrapMetadata = BootstrapMetadata.create(MetadataVersion.latest());
-        BootstrapMetadata mockedMetadata = Mockito.mock(BootstrapMetadata.class);
-        CountDownLatch latch = new CountDownLatch(1);
-        Mockito.when(mockedMetadata.metadataVersion()).thenReturn(bootstrapMetadata.metadataVersion());
-        Mockito.when(mockedMetadata.records()).then(__ -> {
-            if (latch.await(30, TimeUnit.SECONDS)) {
-                return bootstrapMetadata.records();
-            } else {
-                throw new RuntimeException("Latch never completed");
+    public void testUpgradeFromPreProductionVersion() throws Exception {
+        try (InitialSnapshot initialSnapshot = new InitialSnapshot(PRE_PRODUCTION_RECORDS)) {
+            try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of(
+                    FileRawSnapshotReader.open(initialSnapshot.tempDir.toPath(), new OffsetAndEpoch(0, 0)))
+            )) {
+                try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
+                    b.setConfigSchema(SCHEMA);
+                }, OptionalLong.empty(), OptionalLong.empty(), COMPLEX_BOOTSTRAP)) {
+                    QuorumController active = controlEnv.activeController();
+                    TestUtils.waitForCondition(() ->
+                        active.featureControl().metadataVersion().equals(MetadataVersion.IBP_3_0_IV1),
+                        "Failed to get a metadata version of " + MetadataVersion.IBP_3_0_IV1);
+                    // The ConfigRecord in our bootstrap should not have been applied, since there
+                    // were already records present.
+                    assertEquals(Collections.emptyMap(), active.configurationControl().
+                            getConfigs(new ConfigResource(BROKER, "")));
+                }
             }
-        });
+        }
+    }
 
-        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
+    @Test
+    public void testInsertBootstrapRecordsToEmptyLog() throws Exception {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())
+        ) {
             try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
                 b.setConfigSchema(SCHEMA);
-            }, OptionalLong.empty(), OptionalLong.empty(), mockedMetadata)) {
-                ListenerCollection listeners = new ListenerCollection();
-                listeners.add(new Listener().setName("PLAINTEXT").
-                    setHost("localhost").setPort(9092));
+            }, OptionalLong.empty(), OptionalLong.empty(), COMPLEX_BOOTSTRAP)) {
                 QuorumController active = controlEnv.activeController();
 
-                // Issue a register broker request concurrently as the controller is initializing
-                assertEquals(1, latch.getCount(), "Latch should not have been completed yet");
-                CompletableFuture<Void> registrationFuture = new CompletableFuture<>();
-                Thread registerThread = new Thread(() -> {
-                    try {
-                        CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
-                            ANONYMOUS_CONTEXT,
-                            new BrokerRegistrationRequestData().
-                                setBrokerId(0).
-                                setClusterId(active.clusterId()).
-                                setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-                                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV0, MetadataVersion.IBP_3_3_IV0)).
-                                setListeners(listeners));
-                        // Once we have the future, the register broker event has been enqueued
-                        latch.countDown();
-                        reply.get();
-                        registrationFuture.complete(null);
-                    } catch (Throwable t) {
-                        registrationFuture.completeExceptionally(t);
-                    }
-                });
-                registerThread.start();
-                registerThread.join(30_000);
-                assertTrue(registrationFuture.isCompletedExceptionally(),
-                    "Should not be able to register broker since the bootstrap metadata specified an incompatible metadata.version");
-                assertEquals(0, active.clusterControl().brokerRegistrations().size());
+                ControllerRequestContext ctx = new ControllerRequestContext(
+                    new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(Long.MAX_VALUE));
+
+                TestUtils.waitForCondition(() -> {
+                    FinalizedControllerFeatures features = active.finalizedFeatures(ctx).get();
+                    Optional<Short> metadataVersionOpt = features.get(MetadataVersion.FEATURE_NAME);
+                    return Optional.of(MetadataVersion.IBP_3_3_IV1.featureLevel()).equals(metadataVersionOpt);
+                }, "Failed to see expected metadata version from bootstrap metadata");
+
+                TestUtils.waitForCondition(() -> {
+                    ConfigResource defaultBrokerResource = new ConfigResource(BROKER, "");
+
+                    Map<ConfigResource, Collection<String>> configs = Collections.singletonMap(
+                        defaultBrokerResource,
+                        Collections.emptyList()
+                    );
+
+                    Map<ConfigResource, ResultOrError<Map<String, String>>> results =
+                        active.describeConfigs(ctx, configs).get();
+
+                    ResultOrError<Map<String, String>> resultOrError = results.get(defaultBrokerResource);
+                    return resultOrError.isResult() &&
+                        Collections.singletonMap("foo", "bar").equals(resultOrError.result());
+                }, "Failed to see expected config change from bootstrap metadata");
             }
         }
     }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 40dd21c88d3..fd56ef6a644 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -17,7 +17,14 @@
 
 package org.apache.kafka.controller;
 
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.controller.QuorumController.Builder;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metalog.LocalLogManagerTestEnv;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
+import org.apache.kafka.test.TestUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -29,20 +36,9 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.apache.kafka.clients.ApiVersions;
-import org.apache.kafka.controller.QuorumController.Builder;
-import org.apache.kafka.metalog.LocalLogManagerTestEnv;
-import org.apache.kafka.raft.LeaderAndEpoch;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.server.fault.MockFaultHandler;
-import org.apache.kafka.test.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class QuorumControllerTestEnv implements AutoCloseable {
-    private static final Logger log =
-        LoggerFactory.getLogger(QuorumControllerTestEnv.class);
-
     private final List<QuorumController> controllers;
     private final LocalLogManagerTestEnv logEnv;
     private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
@@ -52,7 +48,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         LocalLogManagerTestEnv logEnv,
         Consumer<QuorumController.Builder> builderConsumer
     ) throws Exception {
-        this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), BootstrapMetadata.create(MetadataVersion.latest()));
+        this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(),
+                BootstrapMetadata.fromVersion(MetadataVersion.latest(), "test-provided version"));
     }
 
     public QuorumControllerTestEnv(
@@ -62,7 +59,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
             OptionalLong leaderImbalanceCheckIntervalNs,
             MetadataVersion metadataVersion
     ) throws Exception {
-        this(logEnv, builderConsumer, sessionTimeoutMillis, leaderImbalanceCheckIntervalNs, BootstrapMetadata.create(metadataVersion));
+        this(logEnv, builderConsumer, sessionTimeoutMillis, leaderImbalanceCheckIntervalNs,
+                BootstrapMetadata.fromVersion(metadataVersion, "test-provided version"));
     }
 
     public QuorumControllerTestEnv(
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
new file mode 100644
index 00000000000..73ad2b07d9f
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.NoOpRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.File;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(40)
+public class BootstrapDirectoryTest {
+    final static List<ApiMessageAndVersion> SAMPLE_RECORDS1 = unmodifiableList(asList(
+            new ApiMessageAndVersion(new FeatureLevelRecord().
+                    setName(MetadataVersion.FEATURE_NAME).
+                    setFeatureLevel((short) 7), (short) 0),
+            new ApiMessageAndVersion(new NoOpRecord(), (short) 0),
+            new ApiMessageAndVersion(new NoOpRecord(), (short) 0)));
+
+    static class BootstrapTestDirectory implements AutoCloseable {
+        File directory = null;
+
+        synchronized BootstrapTestDirectory createDirectory() throws Exception {
+            directory = TestUtils.tempDirectory("BootstrapTestDirectory");
+            return this;
+        }
+
+        synchronized String path() {
+            return directory.getAbsolutePath();
+        }
+
+        synchronized String binaryBootstrapPath() {
+            return new File(directory, BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME).getAbsolutePath();
+        }
+
+        @Override
+        public synchronized void close() throws Exception {
+            if (directory != null) {
+                Utils.delete(directory);
+            }
+            directory = null;
+        }
+    }
+
+    @Test
+    public void testReadFromEmptyConfiguration() throws Exception {
+        try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) {
+            assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latest(),
+                    "the default bootstrap"),
+                new BootstrapDirectory(testDirectory.path(), Optional.empty()).read());
+        }
+    }
+
+    @Test
+    public void testReadFromConfigurationWithAncientVersion() throws Exception {
+        try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) {
+            assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION,
+                    "the minimum version bootstrap with metadata.version 3.3-IV0"),
+                new BootstrapDirectory(testDirectory.path(), Optional.of("2.7")).read());
+        }
+    }
+
+    @Test
+    public void testReadFromConfiguration() throws Exception {
+        try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) {
+            assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_3_IV2,
+                    "the configured bootstrap with metadata.version 3.3-IV2"),
+                new BootstrapDirectory(testDirectory.path(), Optional.of("3.3-IV2")).read());
+        }
+    }
+
+    @Test
+    public void testMissingDirectory() throws Exception {
+        assertEquals("No such directory as ./non/existent/directory",
+            assertThrows(RuntimeException.class, () ->
+                new BootstrapDirectory("./non/existent/directory", Optional.empty()).read()).getMessage());
+    }
+
+    @Test
+    public void testReadFromConfigurationFile() throws Exception {
+        try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) {
+            BootstrapDirectory directory = new BootstrapDirectory(testDirectory.path(), Optional.of("3.0-IV0"));
+            BootstrapMetadata metadata = BootstrapMetadata.fromRecords(SAMPLE_RECORDS1,
+                    "the binary bootstrap metadata file: " + testDirectory.binaryBootstrapPath());
+            directory.writeBinaryFile(metadata);
+            assertEquals(metadata, directory.read());
+        }
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
new file mode 100644
index 00000000000..4dbb90f7c70
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.NoOpRecord;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.unmodifiableList;
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(60)
+public class BootstrapMetadataTest {
+    final static List<ApiMessageAndVersion> SAMPLE_RECORDS1 = unmodifiableList(asList(
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName(FEATURE_NAME).
+            setFeatureLevel((short) 7), (short) 0),
+        new ApiMessageAndVersion(new NoOpRecord(), (short) 0),
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName(FEATURE_NAME).
+            setFeatureLevel((short) 6), (short) 0)));
+
+    @Test
+    public void testFromVersion() throws Exception {
+        assertEquals(new BootstrapMetadata(Collections.singletonList(
+            new ApiMessageAndVersion(new FeatureLevelRecord().
+                setName(FEATURE_NAME).
+                setFeatureLevel((short) 6), (short) 0)),
+                    IBP_3_3_IV2, "foo"),
+            BootstrapMetadata.fromVersion(IBP_3_3_IV2, "foo"));
+    }
+
+    @Test
+    public void testFromRecordsList() throws Exception {
+        assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1, IBP_3_3_IV2, "bar"),
+            BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "bar"));
+    }
+
+    @Test
+    public void testFromRecordsListWithoutMetadataVersion() throws Exception {
+        assertEquals("No FeatureLevelRecord for metadata.version was found in the bootstrap " +
+            "metadata from quux", assertThrows(RuntimeException.class,
+                () -> BootstrapMetadata.fromRecords(emptyList(), "quux")).getMessage());
+    }
+
+    @Test
+    public void testCopyWithOnlyVersion() throws Exception {
+        assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1.subList(2, 3), IBP_3_3_IV2, "baz"),
+                BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "baz").copyWithOnlyVersion());
+    }
+
+    final static List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION = unmodifiableList(asList(
+            new ApiMessageAndVersion(new FeatureLevelRecord().
+                setName(FEATURE_NAME).
+                setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0)));
+
+    @Test
+    public void testFromRecordsListWithOldMetadataVersion() throws Exception {
+        RuntimeException exception = assertThrows(RuntimeException.class,
+            () -> BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux"));
+        assertEquals("Bootstrap metadata versions before 3.3-IV0 are not supported. Can't load " +
+            "metadata from quux", exception.getMessage());
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java b/metadata/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
index 89224231898..bda340dce55 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
@@ -59,7 +59,7 @@ public class SnapshotRegistryTest {
         assertEquals(snapshot123, registry.getSnapshot(123));
         assertThrows(RuntimeException.class, () -> registry.getSnapshot(456));
         assertIteratorContains(registry.iterator(), snapshot123);
-        assertEquals("Can't create a new snapshot at epoch 1 because there is already " +
+        assertEquals("Can't create a new in-memory snapshot at epoch 1 because there is already " +
             "a snapshot with epoch 123", assertThrows(RuntimeException.class,
                 () -> registry.getOrCreateSnapshot(1)).getMessage());
         Snapshot snapshot456 = registry.getOrCreateSnapshot(456);
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
index 1b9dd1559ea..93e6b127ab3 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
@@ -140,7 +140,7 @@ public class SnapshottableHashTableTest {
         assertEquals(1, table.snapshottableSize(0));
         assertEquals(3, table.snapshottableSize(1));
         registry.deleteSnapshot(0);
-        assertEquals("No snapshot for epoch 0. Snapshot epochs are: 1",
+        assertEquals("No in-memory snapshot for epoch 0. Snapshot epochs are: 1",
             assertThrows(RuntimeException.class, () ->
                 table.snapshottableSize(0)).getMessage());
         registry.deleteSnapshot(1);
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 55916470cbd..295c91c51fd 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -167,8 +167,17 @@ public enum MetadataVersion {
     
     public static final String FEATURE_NAME = "metadata.version";
 
+    /**
+     * The first version we currently support in KRaft.
+     */
     public static final MetadataVersion MINIMUM_KRAFT_VERSION = IBP_3_0_IV1;
 
+    /**
+     * The first version we currently support in the bootstrap metadata. We chose 3.3IV0 since it
+     * is the first version that supports storing the metadata.version in the log.
+     */
+    public static final MetadataVersion MINIMUM_BOOTSTRAP_VERSION = IBP_3_3_IV0;
+
     public static final MetadataVersion[] VERSIONS;
 
     private final short featureLevel;
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 99f9cc3515c..51933c5ea01 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.server.common;
 
 import org.apache.kafka.common.record.RecordVersion;
 
-import java.util.Arrays;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -72,20 +71,18 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class MetadataVersionTest {
-
     @Test
-    public void testFeatureLevel() {
-        MetadataVersion[] metadataVersions = MetadataVersion.VERSIONS;
-        int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(MetadataVersion.MINIMUM_KRAFT_VERSION);
-        for (int i = 0; i < firstFeatureLevelIndex; i++) {
-            assertTrue(metadataVersions[i].featureLevel() < 0);
+    public void testKRaftFeatureLevelsBefore3_0_IV1() {
+        for (int i = 0; i < MetadataVersion.IBP_3_0_IV1.ordinal(); i++) {
+            assertEquals(-1, MetadataVersion.VERSIONS[i].featureLevel());
         }
-        short expectedFeatureLevel = 1;
-        for (int i = firstFeatureLevelIndex; i < metadataVersions.length; i++) {
-            MetadataVersion metadataVersion = metadataVersions[i];
-            assertEquals(expectedFeatureLevel, metadataVersion.featureLevel(),
-                    String.format("Metadata version %s should have feature level %s", metadataVersion.featureLevel(), expectedFeatureLevel));
-            expectedFeatureLevel += 1;
+    }
+
+    @Test
+    public void testKRaftFeatureLevelsAtAndAfter3_0_IV1() {
+        for (int i = MetadataVersion.IBP_3_0_IV1.ordinal(); i < MetadataVersion.VERSIONS.length; i++) {
+            int expectedLevel = i - MetadataVersion.IBP_3_0_IV1.ordinal() + 1;
+            assertEquals(expectedLevel, MetadataVersion.VERSIONS[i].featureLevel());
         }
     }