You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/05/18 19:08:46 UTC

[kafka] branch trunk updated: KAFKA-13830 MetadataVersion integration for KRaft controller (#12050)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1135f22eaf KAFKA-13830 MetadataVersion integration for KRaft controller (#12050)
1135f22eaf is described below

commit 1135f22eaf404fdf76489302648199578876c4ac
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed May 18 15:08:36 2022 -0400

    KAFKA-13830 MetadataVersion integration for KRaft controller (#12050)
    
    This patch builds on #12072 and adds controller support for metadata.version. The kafka-storage tool now allows a
    user to specify a specific metadata.version to bootstrap into the cluster, otherwise the latest version is used.
    
    Upon the first leader election of the KRaft quroum, this initial metadata.version is written into the metadata log. When
    writing snapshots, a FeatureLevelRecord for metadata.version will be written out ahead of other records so we can
    decode things at the correct version level.
    
    This also includes additional validation in the controller when setting feature levels. It will now check that a given
    metadata.version is supportable by the quroum, not just the brokers.
    
    Reviewers: José Armando García Sancio <js...@gmail.com>, Colin P. McCabe <cm...@apache.org>, dengziming <de...@gmail.com>, Alyssa Huang <ah...@confluent.io>
---
 build.gradle                                       |   3 +-
 checkstyle/import-control-core.xml                 |   1 +
 checkstyle/import-control.xml                      |   3 +
 checkstyle/suppressions.xml                        |   4 +-
 .../org/apache/kafka/clients/NetworkClient.java    |   4 +-
 .../org/apache/kafka/clients/NodeApiVersions.java  |  29 +--
 .../org/apache/kafka/clients/ApiVersionsTest.java  |   2 +-
 .../apache/kafka/clients/NodeApiVersionsTest.java  |  11 +-
 .../producer/internals/TransactionManagerTest.java |   8 +-
 .../kafka/admin/BrokerApiVersionsCommand.scala     |   7 +-
 core/src/main/scala/kafka/raft/RaftManager.scala   |   3 +-
 .../main/scala/kafka/server/BrokerFeatures.scala   |   7 +
 .../main/scala/kafka/server/ControllerServer.scala |  16 +-
 .../scala/kafka/server/FinalizedFeatureCache.scala |   5 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  21 ++-
 core/src/main/scala/kafka/server/KafkaServer.scala |   2 +-
 .../server/metadata/BrokerMetadataListener.scala   |   7 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |  13 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  |  83 +++++---
 .../main/scala/kafka/tools/TestRaftServer.scala    |   2 +-
 core/src/test/java/kafka/test/ClusterConfig.java   |  25 +--
 core/src/test/java/kafka/test/ClusterInstance.java |   7 +
 .../java/kafka/test/annotation/ClusterTest.java    |   3 +-
 .../kafka/test/junit/ClusterTestExtensions.java    |   5 +-
 .../test/junit/RaftClusterInvocationContext.java   |  12 ++
 .../test/junit/ZkClusterInvocationContext.java     |  12 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |   8 +-
 core/src/test/java/kafka/testkit/TestKitNodes.java |  21 ++-
 .../transaction/ProducerIdsIntegrationTest.scala   |   7 +-
 .../kafka/server/KRaftClusterTest.scala            |  20 +-
 .../server/MetadataVersionIntegrationTest.scala    |  77 ++++++++
 .../kafka/server/QuorumTestHarness.scala           |  14 +-
 .../controller/ControllerChannelManagerTest.scala  |  14 +-
 .../group/GroupMetadataManagerTest.scala           |   8 +-
 .../scala/unit/kafka/raft/RaftManagerTest.scala    |   1 -
 .../server/AbstractApiVersionsRequestTest.scala    |   2 +-
 .../unit/kafka/server/BrokerFeaturesTest.scala     |   2 +-
 .../kafka/server/BrokerLifecycleManagerTest.scala  |   2 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   4 +-
 .../unit/kafka/server/KafkaRaftServerTest.scala    |   8 +-
 ...chDrivenReplicationProtocolAcceptanceTest.scala |   2 +-
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  22 ++-
 .../apache/kafka/controller/BootstrapMetadata.java | 206 ++++++++++++++++++++
 .../kafka/controller/ClusterControlManager.java    |   6 +-
 .../kafka/controller/FeatureControlManager.java    | 209 +++++++++++++++++----
 .../apache/kafka/controller/QuorumController.java  |  74 +++++++-
 .../apache/kafka/controller/QuorumFeatures.java    |  84 ++++++++-
 .../java/org/apache/kafka/image/AclsDelta.java     |   5 +
 .../org/apache/kafka/image/ClientQuotasDelta.java  |   5 +
 .../java/org/apache/kafka/image/ClusterDelta.java  |   5 +
 .../apache/kafka/image/ConfigurationsDelta.java    |   5 +
 .../java/org/apache/kafka/image/FeaturesDelta.java |  28 ++-
 .../java/org/apache/kafka/image/FeaturesImage.java |  22 ++-
 .../java/org/apache/kafka/image/MetadataDelta.java |  19 ++
 .../java/org/apache/kafka/image/MetadataImage.java |   2 +
 .../org/apache/kafka/image/ProducerIdsDelta.java   |   5 +
 .../java/org/apache/kafka/image/TopicsDelta.java   |   5 +
 .../org/apache/kafka/metadata/VersionRange.java    |   4 +
 .../kafka/metadata/util}/SnapshotFileReader.java   |   2 +-
 .../kafka/metadata/util/SnapshotFileWriter.java    |  81 ++++++++
 .../kafka/controller/BootstrapMetadataTest.java    |  83 ++++++++
 .../controller/FeatureControlManagerTest.java      | 102 +++++++++-
 .../kafka/controller/QuorumControllerTest.java     |  17 +-
 .../kafka/controller/QuorumControllerTestEnv.java  |  15 +-
 .../kafka/controller/QuorumFeaturesTest.java       | 104 ++++++++++
 .../org/apache/kafka/image/FeaturesImageTest.java  |   5 +-
 .../kafka/metadata/FeatureLevelListener.java}      |  21 +--
 .../kafka/server/common/MetadataVersion.java       |  96 ++++++++--
 .../server/common/MetadataVersionValidator.java    |   2 +-
 .../kafka/server/common/MetadataVersionTest.java   |  57 +++++-
 .../common/MetadataVersionValidatorTest.java       |   2 +-
 .../java/org/apache/kafka/shell/MetadataShell.java |   1 +
 72 files changed, 1529 insertions(+), 250 deletions(-)

diff --git a/build.gradle b/build.gradle
index 8fd32e6874..6983a971a7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -447,7 +447,8 @@ subprojects {
     maxParallelForks = maxTestForks
     ignoreFailures = userIgnoreFailures
 
-    maxHeapSize = defaultMaxHeapSize
+    // Increase heap size for integration tests
+    maxHeapSize = "2560m"
     jvmArgs = defaultJvmArgs
 
 
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 36e5cc6355..28b325b093 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -82,6 +82,7 @@
     <allow pkg="org.apache.kafka.controller"/>
     <allow pkg="org.apache.kafka.metadata"/>
     <allow pkg="org.apache.kafka.server.authorizer"/>
+    <allow pkg="org.apache.kafka.server.common" />
     <allow pkg="kafka.test.annotation"/>
     <allow pkg="kafka.test.junit"/>
     <allow pkg="kafka.network"/>
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 414e59a614..7b5f20aea4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -259,10 +259,12 @@
     <allow pkg="org.apache.kafka.common.message" />
     <allow pkg="org.apache.kafka.common.metadata" />
     <allow pkg="org.apache.kafka.common.protocol" />
+    <allow pkg="org.apache.kafka.common.record" />
     <allow pkg="org.apache.kafka.common.requests" />
     <allow pkg="org.apache.kafka.image" />
     <allow pkg="org.apache.kafka.metadata" />
     <allow pkg="org.apache.kafka.metalog" />
+    <allow pkg="org.apache.kafka.queue" />
     <allow pkg="org.apache.kafka.raft" />
     <allow pkg="org.apache.kafka.server.authorizer" />
     <allow pkg="org.apache.kafka.server.common" />
@@ -348,6 +350,7 @@
     <allow pkg="net.sourceforge.argparse4j" />
     <allow pkg="org.apache.kafka.common"/>
     <allow pkg="org.apache.kafka.metadata"/>
+    <allow pkg="org.apache.kafka.controller.util"/>
     <allow pkg="org.apache.kafka.queue"/>
     <allow pkg="org.apache.kafka.raft"/>
     <allow pkg="org.apache.kafka.server.common" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0cc6c831d9..c5cd99fdaa 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -290,7 +290,7 @@
     <suppress checks="ClassDataAbstractionCoupling"
               files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
-              files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
+              files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
               files="(QuorumController).java"/>
     <suppress checks="CyclomaticComplexity"
@@ -303,6 +303,8 @@
               files="(MetadataImage).java"/>
     <suppress checks="ImportControl"
               files="ApiVersionsResponse.java"/>
+    <suppress checks="AvoidStarImport"
+              files="MetadataVersionTest.java"/>
 
     <!-- Storage -->
     <suppress checks="(CyclomaticComplexity|ParameterNumber)"
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index cabc3cccdd..81463d508a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -915,7 +915,9 @@ public class NetworkClient implements KafkaClient {
             }
             return;
         }
-        NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
+        NodeApiVersions nodeVersionInfo = new NodeApiVersions(
+            apiVersionsResponse.data().apiKeys(),
+            apiVersionsResponse.data().supportedFeatures());
         apiVersions.update(node, nodeVersionInfo);
         this.connectionStates.ready(node);
         log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.",
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 3c09f0eb4e..a3aaa88fee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -17,8 +17,9 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
-import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
+import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.utils.Utils;
@@ -27,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +46,8 @@ public class NodeApiVersions {
     // List of APIs which the broker supports, but which are unknown to the client
     private final List<ApiVersion> unknownApis = new ArrayList<>();
 
+    private final Map<String, SupportedVersionRange> supportedFeatures;
+
     /**
      * Create a NodeApiVersions object with the current ApiVersions.
      *
@@ -72,7 +76,7 @@ public class NodeApiVersions {
             }
             if (!exists) apiVersions.add(ApiVersionsResponse.toApiVersion(apiKey));
         }
-        return new NodeApiVersions(apiVersions);
+        return new NodeApiVersions(apiVersions, Collections.emptyList());
     }
 
 
@@ -91,7 +95,7 @@ public class NodeApiVersions {
                 .setMaxVersion(maxVersion)));
     }
 
-    public NodeApiVersions(ApiVersionCollection nodeApiVersions) {
+    public NodeApiVersions(Collection<ApiVersion> nodeApiVersions, Collection<SupportedFeatureKey> nodeSupportedFeatures) {
         for (ApiVersion nodeApiVersion : nodeApiVersions) {
             if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
                 ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
@@ -101,18 +105,13 @@ public class NodeApiVersions {
                 unknownApis.add(nodeApiVersion);
             }
         }
-    }
 
-    public NodeApiVersions(Collection<ApiVersion> nodeApiVersions) {
-        for (ApiVersion nodeApiVersion : nodeApiVersions) {
-            if (ApiKeys.hasId(nodeApiVersion.apiKey())) {
-                ApiKeys nodeApiKey = ApiKeys.forId(nodeApiVersion.apiKey());
-                supportedVersions.put(nodeApiKey, nodeApiVersion);
-            } else {
-                // Newer brokers may support ApiKeys we don't know about
-                unknownApis.add(nodeApiVersion);
-            }
+        Map<String, SupportedVersionRange> supportedFeaturesBuilder = new HashMap<>();
+        for (SupportedFeatureKey supportedFeature : nodeSupportedFeatures) {
+            supportedFeaturesBuilder.put(supportedFeature.name(),
+                    new SupportedVersionRange(supportedFeature.minVersion(), supportedFeature.maxVersion()));
         }
+        this.supportedFeatures = Collections.unmodifiableMap(supportedFeaturesBuilder);
     }
 
     /**
@@ -233,4 +232,8 @@ public class NodeApiVersions {
     public Map<ApiKeys, ApiVersion> allSupportedApiVersions() {
         return supportedVersions;
     }
+
+    public Map<String, SupportedVersionRange> supportedFeatures() {
+        return supportedFeatures;
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
index 206e95e4d3..8906553643 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
@@ -48,7 +48,7 @@ public class ApiVersionsTest {
         assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
 
         // something that doesn't support PRODUCE, which is the case with Raft-based controllers
-        apiVersions.update("2", new NodeApiVersions(Collections.singleton(
+        apiVersions.update("2", NodeApiVersions.create(Collections.singleton(
             new ApiVersionsResponseData.ApiVersion()
                 .setApiKey(ApiKeys.FETCH.id)
                 .setMinVersion((short) 0)
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index b04d83b47d..f379366ac1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -38,7 +39,7 @@ public class NodeApiVersionsTest {
 
     @Test
     public void testUnsupportedVersionsToString() {
-        NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
+        NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
         StringBuilder bld = new StringBuilder();
         String prefix = "(";
         for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
@@ -67,7 +68,7 @@ public class NodeApiVersionsTest {
                         .setMaxVersion((short) 10001));
             } else versionList.add(ApiVersionsResponse.toApiVersion(apiKey));
         }
-        NodeApiVersions versions = new NodeApiVersions(versionList);
+        NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
         StringBuilder bld = new StringBuilder();
         String prefix = "(";
         for (ApiKeys apiKey : ApiKeys.values()) {
@@ -124,7 +125,7 @@ public class NodeApiVersionsTest {
 
     @Test
     public void testUsableVersionCalculationNoKnownVersions() {
-        NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection());
+        NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection(), Collections.emptyList());
         assertThrows(UnsupportedVersionException.class,
             () -> versions.latestUsableVersion(ApiKeys.FETCH));
     }
@@ -146,7 +147,7 @@ public class NodeApiVersionsTest {
                 .setApiKey((short) 100)
                 .setMinVersion((short) 0)
                 .setMaxVersion((short) 1));
-        NodeApiVersions versions = new NodeApiVersions(versionList);
+        NodeApiVersions versions = new NodeApiVersions(versionList, Collections.emptyList());
         for (ApiKeys apiKey: ApiKeys.apisForListener(scope)) {
             assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey));
         }
@@ -156,7 +157,7 @@ public class NodeApiVersionsTest {
     @EnumSource(ApiMessageType.ListenerType.class)
     public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
         ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
-        NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
+        NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList());
 
         for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
             ApiVersion apiVersion = versions.apiVersion(ApiKeys.forId(apiVersionKey.apiKey()));
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 377db5ec06..b6bf9e6f4f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -153,7 +153,7 @@ public class TransactionManagerTest {
     private void initializeTransactionManager(Optional<String> transactionalId) {
         Metrics metrics = new Metrics(time);
 
-        apiVersions.update("0", new NodeApiVersions(Arrays.asList(
+        apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
                 new ApiVersion()
                     .setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
                     .setMinVersion((short) 0)
@@ -2615,7 +2615,7 @@ public class TransactionManagerTest {
 
     @Test
     public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException {
-        apiVersions.update("0", new NodeApiVersions(Arrays.asList(
+        apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
                 new ApiVersion()
                     .setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
                     .setMinVersion((short) 0)
@@ -2814,7 +2814,7 @@ public class TransactionManagerTest {
 
     @Test
     public void testAbortTransactionAndReuseSequenceNumberOnError() throws InterruptedException {
-        apiVersions.update("0", new NodeApiVersions(Arrays.asList(
+        apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
                 new ApiVersion()
                         .setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
                         .setMinVersion((short) 0)
@@ -2866,7 +2866,7 @@ public class TransactionManagerTest {
         // Set the InitProducerId version such that bumping the epoch number is not supported. This will test the case
         // where the sequence number is reset on an UnknownProducerId error, allowing subsequent transactions to
         // append to the log successfully
-        apiVersions.update("0", new NodeApiVersions(Arrays.asList(
+        apiVersions.update("0", NodeApiVersions.create(Arrays.asList(
                 new ApiVersion()
                     .setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
                     .setMinVersion((short) 0)
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index a8b536113b..957cb2ce8b 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -40,7 +40,6 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.LogContext
 import org.apache.kafka.common.utils.{KafkaThread, Time}
 import org.apache.kafka.common.Node
-import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
@@ -157,10 +156,10 @@ object BrokerApiVersionsCommand {
       throw new RuntimeException(s"Request ${request.apiKey()} failed on brokers $bootstrapBrokers")
     }
 
-    private def getApiVersions(node: Node): ApiVersionCollection = {
+    private def getNodeApiVersions(node: Node): NodeApiVersions = {
       val response = send(node, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
       Errors.forCode(response.data.errorCode).maybeThrow()
-      response.data.apiKeys
+      new NodeApiVersions(response.data.apiKeys, response.data.supportedFeatures)
     }
 
     /**
@@ -186,7 +185,7 @@ object BrokerApiVersionsCommand {
 
     def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
       findAllBrokers().map { broker =>
-        broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker)))
+        broker -> Try[NodeApiVersions](getNodeApiVersions(broker))
       }.toMap
 
     def close(): Unit = {
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 4c29250073..cbb9f7b89b 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -111,6 +111,7 @@ class KafkaRaftManager[T](
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
 ) extends RaftManager[T] with Logging {
 
+  val apiVersions = new ApiVersions()
   private val raftConfig = new RaftConfig(config)
   private val threadNamePrefix = threadNamePrefixOpt.getOrElse("kafka-raft")
   private val logContext = new LogContext(s"[RaftManager nodeId=${config.nodeId}] ")
@@ -274,7 +275,7 @@ class KafkaRaftManager[T](
       config.connectionSetupTimeoutMaxMs,
       time,
       discoverBrokerVersions,
-      new ApiVersions,
+      apiVersions,
       logContext
     )
   }
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index 9511172c7e..ff7e60908c 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import kafka.utils.Logging
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.server.common.MetadataVersion
 
 import java.util
 import scala.jdk.CollectionConverters._
@@ -72,6 +73,12 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
 object BrokerFeatures extends Logging {
 
   def createDefault(): BrokerFeatures = {
+    new BrokerFeatures(Features.supportedFeatures(
+      java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+        new SupportedVersionRange(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()))))
+  }
+
+  def createEmpty(): BrokerFeatures = {
     new BrokerFeatures(Features.emptySupportedFeatures())
   }
 
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index e004996bf7..7a2913e9cf 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -30,13 +30,14 @@ import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp}
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.clients.ApiVersions
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Endpoint}
-import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
+import org.apache.kafka.controller.{BootstrapMetadata, Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
 import org.apache.kafka.metadata.KafkaConfigSchema
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -63,6 +64,8 @@ class ControllerServer(
   val threadNamePrefix: Option[String],
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
   val configSchema: KafkaConfigSchema,
+  val raftApiVersions: ApiVersions,
+  val bootstrapMetadata: BootstrapMetadata
 ) extends Logging with KafkaMetricsGroup {
   import kafka.server.Server._
 
@@ -162,7 +165,8 @@ class ControllerServer(
       alterConfigPolicy = Option(config.
         getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
 
-      val quorumFeatures = QuorumFeatures.create(config.nodeId, QuorumFeatures.defaultFeatureMap())
+      val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get())
+      val quorumFeatures = QuorumFeatures.create(config.nodeId, raftApiVersions, QuorumFeatures.defaultFeatureMap(), controllerNodes)
 
       val controllerBuilder = {
         val leaderImbalanceCheckIntervalNs = if (config.autoLeaderRebalanceEnable) {
@@ -179,7 +183,7 @@ class ControllerServer(
           setQuorumFeatures(quorumFeatures).
           setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
           setDefaultNumPartitions(config.numPartitions.intValue()).
-          setIsLeaderRecoverySupported(config.interBrokerProtocolVersion.isAtLeast(IBP_3_2_IV0)).
+          setIsLeaderRecoverySupported(bootstrapMetadata.metadataVersion().isAtLeast(IBP_3_2_IV0)).
           setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
             TimeUnit.MILLISECONDS)).
           setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
@@ -188,7 +192,8 @@ class ControllerServer(
           setCreateTopicPolicy(createTopicPolicy.asJava).
           setAlterConfigPolicy(alterConfigPolicy.asJava).
           setConfigurationValidator(new ControllerConfigurationValidator()).
-          setStaticConfig(config.originals)
+          setStaticConfig(config.originals).
+          setBootstrapMetadata(bootstrapMetadata)
       }
       authorizer match {
         case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)
@@ -197,7 +202,6 @@ class ControllerServer(
       controller = controllerBuilder.build()
 
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
-      val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
       controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
         authorizer,
         quotaManagers,
@@ -206,7 +210,7 @@ class ControllerServer(
         raftManager,
         config,
         metaProperties,
-        controllerNodes.toSeq,
+        controllerNodes.asScala.toSeq,
         apiVersionManager)
       controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
         socketServer.dataPlaneRequestChannel,
diff --git a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
index ee7337653c..390110dba0 100644
--- a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
+++ b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
@@ -22,10 +22,10 @@ import java.util.Collections
 import kafka.utils.Logging
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
 import org.apache.kafka.image.FeaturesDelta
+import org.apache.kafka.server.common.MetadataVersion
 
 import scala.concurrent.TimeoutException
 import scala.math.max
-
 import scala.compat.java8.OptionConverters._
 
 // Raised whenever there was an error in updating the FinalizedFeatureCache with features.
@@ -144,6 +144,9 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
           new FinalizedVersionRange(version, version))
       }
     }
+    featuresDelta.metadataVersionChange().ifPresent { metadataVersion =>
+      newFeatures.put(MetadataVersion.FEATURE_NAME, new FinalizedVersionRange(metadataVersion.featureLevel(), metadataVersion.featureLevel()))
+    }
     featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(Features.finalizedFeatures(
       Collections.unmodifiableMap(newFeatures)), highestMetadataOffset))
   }
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index a0dd19559c..5a1c3087d3 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -18,7 +18,6 @@ package kafka.server
 
 import java.io.File
 import java.util.concurrent.CompletableFuture
-
 import kafka.common.InconsistentNodeIdException
 import kafka.log.{LogConfig, UnifiedLog}
 import kafka.metrics.KafkaMetricsReporter
@@ -28,11 +27,13 @@ import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
 import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.controller.BootstrapMetadata
 import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
 import org.apache.kafka.raft.RaftConfig
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
+import java.nio.file.Paths
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
@@ -54,7 +55,7 @@ class KafkaRaftServer(
   KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
   KafkaYammerMetrics.INSTANCE.configure(config.originals)
 
-  private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
+  private val (metaProps, bootstrapMetadata, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
 
   private val metrics = Server.initializeMetrics(
     config,
@@ -102,6 +103,8 @@ class KafkaRaftServer(
       threadNamePrefix,
       controllerQuorumVotersFuture,
       KafkaRaftServer.configSchema,
+      raftManager.apiVersions,
+      bootstrapMetadata
     ))
   } else {
     None
@@ -149,7 +152,7 @@ object KafkaRaftServer {
    * @return A tuple containing the loaded meta properties (which are guaranteed to
    *         be consistent across all log dirs) and the offline directories
    */
-  def initializeLogDirs(config: KafkaConfig): (MetaProperties, Seq[String]) = {
+  def initializeLogDirs(config: KafkaConfig): (MetaProperties, BootstrapMetadata, Seq[String]) = {
     val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
     val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
       getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false)
@@ -177,7 +180,15 @@ object KafkaRaftServer {
           "If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
     }
 
-    (metaProperties, offlineDirs.toSeq)
+    // Load the bootstrap metadata file or, in the case of an upgrade from KRaft preview, bootstrap the
+    // metadata.version corresponding to a user-configured IBP.
+    val bootstrapMetadata = if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
+      BootstrapMetadata.load(Paths.get(config.metadataLogDir), config.interBrokerProtocolVersion)
+    } else {
+      BootstrapMetadata.load(Paths.get(config.metadataLogDir), MetadataVersion.IBP_3_0_IV0)
+    }
+
+    (metaProperties, bootstrapMetadata, offlineDirs.toSeq)
   }
 
   val configSchema = new KafkaConfigSchema(Map(
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b1273ed628..41ff21bb57 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -162,7 +162,7 @@ class KafkaServer(
 
   private var _featureChangeListener: FinalizedFeatureChangeListener = null
 
-  val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
+  val brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty()
   val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
 
   override def brokerState: BrokerState = _brokerState
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 54a777f67f..e8819e9c5b 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -118,9 +118,14 @@ class BrokerMetadataListener(
       }
       _publisher.foreach(publish)
 
+      // If we detected a change in metadata.version, generate a local snapshot
+      val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta =>
+        featuresDelta.metadataVersionChange().isPresent
+      }
+
       snapshotter.foreach { snapshotter =>
         _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-        if (shouldSnapshot()) {
+        if (shouldSnapshot() || metadataVersionChanged) {
           if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) {
             _bytesSinceLastSnapshot = 0L
           }
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index fb6bb61544..01418d2630 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.common.MetadataVersion
 
 import scala.collection.mutable
 
@@ -133,19 +134,27 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       // Publish the new metadata image to the metadata cache.
       metadataCache.setImage(newImage)
 
+      val metadataVersionLogMsg = newImage.features().metadataVersion() match {
+        case MetadataVersion.UNINITIALIZED => "un-initialized metadata.version"
+        case mv: MetadataVersion => s"metadata.version ${mv.featureLevel()}"
+      }
+
       if (_firstPublish) {
-        info(s"Publishing initial metadata at offset $highestOffsetAndEpoch.")
+        info(s"Publishing initial metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
 
         // If this is the first metadata update we are applying, initialize the managers
         // first (but after setting up the metadata cache).
         initializeManagers()
       } else if (isDebugEnabled) {
-        debug(s"Publishing metadata at offset $highestOffsetAndEpoch.")
+        debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
       }
 
       // Apply feature deltas.
       Option(delta.featuresDelta()).foreach { featuresDelta =>
         featureCache.update(featuresDelta, highestOffsetAndEpoch.offset)
+        featuresDelta.metadataVersionChange().ifPresent{ metadataVersion =>
+          info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.")
+        }
       }
 
       // Apply topic deltas.
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index 28377d297c..9b78faf6ed 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -19,48 +19,25 @@ package kafka.tools
 
 import java.io.PrintStream
 import java.nio.file.{Files, Paths}
-
 import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties}
 import kafka.utils.{Exit, Logging}
 import net.sourceforge.argparse4j.ArgumentParsers
 import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
+import net.sourceforge.argparse4j.inf.Namespace
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.server.common.MetadataVersion
 
 import scala.collection.mutable
 
 object StorageTool extends Logging {
   def main(args: Array[String]): Unit = {
     try {
-      val parser = ArgumentParsers.
-        newArgumentParser("kafka-storage").
-        defaultHelp(true).
-        description("The Kafka storage tool.")
-      val subparsers = parser.addSubparsers().dest("command")
-
-      val infoParser = subparsers.addParser("info").
-        help("Get information about the Kafka log directories on this node.")
-      val formatParser = subparsers.addParser("format").
-        help("Format the Kafka log directories on this node.")
-      subparsers.addParser("random-uuid").help("Print a random UUID.")
-      List(infoParser, formatParser).foreach(parser => {
-        parser.addArgument("--config", "-c").
-          action(store()).
-          required(true).
-          help("The Kafka configuration file to use.")
-      })
-      formatParser.addArgument("--cluster-id", "-t").
-        action(store()).
-        required(true).
-        help("The cluster ID to use.")
-      formatParser.addArgument("--ignore-formatted", "-g").
-        action(storeTrue())
-
-      val namespace = parser.parseArgsOrFail(args)
+      val namespace = parseArguments(args)
       val command = namespace.getString("command")
       val config = Option(namespace.getString("config")).flatMap(
         p => Some(new KafkaConfig(Utils.loadProps(p))))
-
       command match {
         case "info" =>
           val directories = configToLogDirectories(config.get)
@@ -70,13 +47,17 @@ object StorageTool extends Logging {
         case "format" =>
           val directories = configToLogDirectories(config.get)
           val clusterId = namespace.getString("cluster_id")
+          val metadataVersion = getMetadataVersion(namespace)
+          if (!metadataVersion.isKRaftSupported) {
+            throw new TerseFailure(s"Must specify a metadata version of at least 1.")
+          }
           val metaProperties = buildMetadataProperties(clusterId, config.get)
           val ignoreFormatted = namespace.getBoolean("ignore_formatted")
           if (!configToSelfManagedMode(config.get)) {
             throw new TerseFailure("The kafka configuration file appears to be for " +
               "a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
           }
-          Exit.exit(formatCommand(System.out, directories, metaProperties, ignoreFormatted ))
+          Exit.exit(formatCommand(System.out, directories, metaProperties, metadataVersion, ignoreFormatted))
 
         case "random-uuid" =>
           System.out.println(Uuid.randomUuid)
@@ -92,6 +73,37 @@ object StorageTool extends Logging {
     }
   }
 
+  def parseArguments(args: Array[String]): Namespace = {
+    val parser = ArgumentParsers.
+      newArgumentParser("kafka-storage").
+      defaultHelp(true).
+      description("The Kafka storage tool.")
+    val subparsers = parser.addSubparsers().dest("command")
+
+    val infoParser = subparsers.addParser("info").
+      help("Get information about the Kafka log directories on this node.")
+    val formatParser = subparsers.addParser("format").
+      help("Format the Kafka log directories on this node.")
+    subparsers.addParser("random-uuid").help("Print a random UUID.")
+    List(infoParser, formatParser).foreach(parser => {
+      parser.addArgument("--config", "-c").
+        action(store()).
+        required(true).
+        help("The Kafka configuration file to use.")
+    })
+    formatParser.addArgument("--cluster-id", "-t").
+      action(store()).
+      required(true).
+      help("The cluster ID to use.")
+    formatParser.addArgument("--ignore-formatted", "-g").
+      action(storeTrue())
+    formatParser.addArgument("--metadata-version", "-v").
+      action(store()).
+      help(s"The initial metadata.version to use. Default is (${MetadataVersion.latest().featureLevel()}).")
+
+    parser.parseArgsOrFail(args)
+  }
+
   def configToLogDirectories(config: KafkaConfig): Seq[String] = {
     val directories = new mutable.TreeSet[String]
     directories ++= config.logDirs
@@ -101,6 +113,12 @@ object StorageTool extends Logging {
 
   def configToSelfManagedMode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty
 
+  def getMetadataVersion(namespace: Namespace): MetadataVersion = {
+    Option(namespace.getString("metadata_version")).
+      map(mv => MetadataVersion.fromFeatureLevel(mv.toShort)).
+      getOrElse(MetadataVersion.latest())
+  }
+
   def infoCommand(stream: PrintStream, selfManagedMode: Boolean, directories: Seq[String]): Int = {
     val problems = new mutable.ArrayBuffer[String]
     val foundDirectories = new mutable.ArrayBuffer[String]
@@ -197,13 +215,16 @@ object StorageTool extends Logging {
       case e: Throwable => throw new TerseFailure(s"Cluster ID string $clusterIdStr " +
         s"does not appear to be a valid UUID: ${e.getMessage}")
     }
-    require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.")
+    if (config.nodeId < 0) {
+      throw new TerseFailure(s"The node.id must be set to a non-negative integer. We saw ${config.nodeId}")
+    }
     new MetaProperties(effectiveClusterId.toString, config.nodeId)
   }
 
   def formatCommand(stream: PrintStream,
                     directories: Seq[String],
                     metaProperties: MetaProperties,
+                    metadataVersion: MetadataVersion,
                     ignoreFormatted: Boolean): Int = {
     if (directories.isEmpty) {
       throw new TerseFailure("No log directories found in the configuration.")
@@ -231,6 +252,10 @@ object StorageTool extends Logging {
       val metaPropertiesPath = Paths.get(directory, "meta.properties")
       val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
       checkpoint.write(metaProperties.toProperties)
+
+      val bootstrapMetadata = BootstrapMetadata.create(metadataVersion)
+      BootstrapMetadata.write(bootstrapMetadata, Paths.get(directory))
+
       stream.println(s"Formatting ${directory}")
     })
     0
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 56104df821..a72784c469 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -90,7 +90,7 @@ class TestRaftServer(
       time,
       metrics,
       Some(threadNamePrefix),
-      CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters))
+      CompletableFuture.completedFuture(RaftConfig.parseVoterConnections(config.quorumVoters)),
     )
 
     workloadGenerator = new RaftWorkloadGenerator(
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java
index 20b74cf432..5830959283 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -19,6 +19,7 @@ package kafka.test;
 
 import kafka.test.annotation.Type;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.io.File;
 import java.util.HashMap;
@@ -41,7 +42,7 @@ public class ClusterConfig {
     private final SecurityProtocol securityProtocol;
     private final String listenerName;
     private final File trustStoreFile;
-    private final String ibp;
+    private final MetadataVersion metadataVersion;
 
     private final Properties serverProperties = new Properties();
     private final Properties producerProperties = new Properties();
@@ -53,7 +54,7 @@ public class ClusterConfig {
 
     ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart,
                   SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
-                  String ibp) {
+                  MetadataVersion metadataVersion) {
         this.type = type;
         this.brokers = brokers;
         this.controllers = controllers;
@@ -62,7 +63,7 @@ public class ClusterConfig {
         this.securityProtocol = securityProtocol;
         this.listenerName = listenerName;
         this.trustStoreFile = trustStoreFile;
-        this.ibp = ibp;
+        this.metadataVersion = metadataVersion;
     }
 
     public Type clusterType() {
@@ -121,8 +122,8 @@ public class ClusterConfig {
         return Optional.ofNullable(trustStoreFile);
     }
 
-    public Optional<String> ibp() {
-        return Optional.ofNullable(ibp);
+    public Optional<MetadataVersion> metadataVersion() {
+        return Optional.ofNullable(metadataVersion);
     }
 
     public Properties brokerServerProperties(int brokerId) {
@@ -130,16 +131,16 @@ public class ClusterConfig {
     }
 
     public Map<String, String> nameTags() {
-        Map<String, String> tags = new LinkedHashMap<>(3);
+        Map<String, String> tags = new LinkedHashMap<>(4);
         name().ifPresent(name -> tags.put("Name", name));
-        ibp().ifPresent(ibp -> tags.put("IBP", ibp));
+        metadataVersion().ifPresent(mv -> tags.put("MetadataVersion", mv.toString()));
         tags.put("Security", securityProtocol.name());
         listenerName().ifPresent(listener -> tags.put("Listener", listener));
         return tags;
     }
 
     public ClusterConfig copyOf() {
-        ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, ibp);
+        ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion);
         copy.serverProperties.putAll(serverProperties);
         copy.producerProperties.putAll(producerProperties);
         copy.consumerProperties.putAll(consumerProperties);
@@ -165,7 +166,7 @@ public class ClusterConfig {
         private SecurityProtocol securityProtocol;
         private String listenerName;
         private File trustStoreFile;
-        private String ibp;
+        private MetadataVersion metadataVersion;
 
         Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
             this.type = type;
@@ -215,13 +216,13 @@ public class ClusterConfig {
             return this;
         }
 
-        public Builder ibp(String ibp) {
-            this.ibp = ibp;
+        public Builder metadataVersion(MetadataVersion metadataVersion) {
+            this.metadataVersion = metadataVersion;
             return this;
         }
 
         public ClusterConfig build() {
-            return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, ibp);
+            return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion);
         }
     }
 }
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index 099d93280d..a7052857c3 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -18,11 +18,13 @@
 package kafka.test;
 
 import kafka.network.SocketServer;
+import kafka.server.BrokerFeatures;
 import kafka.test.annotation.ClusterTest;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.network.ListenerName;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -95,6 +97,11 @@ public interface ClusterInstance {
      */
     SocketServer anyControllerSocketServer();
 
+    /**
+     * Return a mapping of the underlying broker IDs to their supported features
+     */
+    Map<Integer, BrokerFeatures> brokerFeatures();
+
     /**
      * The underlying object which is responsible for setting up and tearing down the cluster.
      */
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index 11336ab87a..b83df127f5 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -18,6 +18,7 @@
 package kafka.test.annotation;
 
 import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.TestTemplate;
 
 import java.lang.annotation.Documented;
@@ -40,6 +41,6 @@ public @interface ClusterTest {
     String name() default "";
     SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
     String listener() default "";
-    String ibp() default "";
+    MetadataVersion metadataVersion() default MetadataVersion.UNINITIALIZED;
     ClusterConfigProperty[] serverProperties() default {};
 }
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
index 293f00b035..f0c1d9bbda 100644
--- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -25,6 +25,7 @@ import kafka.test.annotation.ClusterTemplate;
 import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.ClusterTests;
 import kafka.test.annotation.Type;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
 import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
@@ -194,8 +195,8 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
             properties.put(property.key(), property.value());
         }
 
-        if (!annot.ibp().isEmpty()) {
-            builder.ibp(annot.ibp());
+        if (!annot.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+            builder.metadataVersion(annot.metadataVersion());
         }
 
         ClusterConfig config = builder.build();
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 711143785c..f0ca98a5f2 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -18,6 +18,7 @@
 package kafka.test.junit;
 
 import kafka.network.SocketServer;
+import kafka.server.BrokerFeatures;
 import kafka.server.BrokerServer;
 import kafka.server.ControllerServer;
 import kafka.test.ClusterConfig;
@@ -29,6 +30,7 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
 import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
 import org.junit.jupiter.api.extension.Extension;
@@ -38,6 +40,7 @@ import scala.compat.java8.OptionConverters;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -83,6 +86,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
         return Arrays.asList(
             (BeforeTestExecutionCallback) context -> {
                 TestKitNodes nodes = new TestKitNodes.Builder().
+                        setBootstrapMetadataVersion(clusterConfig.metadataVersion().orElse(MetadataVersion.latest())).
                         setNumBrokerNodes(clusterConfig.numBrokers()).
                         setNumControllerNodes(clusterConfig.numControllers()).build();
                 nodes.brokerNodes().forEach((brokerId, brokerNode) -> {
@@ -168,6 +172,14 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
                 .orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
         }
 
+        @Override
+        public Map<Integer, BrokerFeatures> brokerFeatures() {
+            return brokers().collect(Collectors.toMap(
+                brokerServer -> brokerServer.config().nodeId(),
+                BrokerServer::brokerFeatures
+            ));
+        }
+
         @Override
         public ClusterType clusterType() {
             return ClusterType.RAFT;
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 68ec041053..02f21906ed 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -19,6 +19,7 @@ package kafka.test.junit;
 
 import kafka.api.IntegrationTestHarness;
 import kafka.network.SocketServer;
+import kafka.server.BrokerFeatures;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.test.ClusterConfig;
@@ -41,6 +42,7 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -106,7 +108,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
                     @Override
                     public Properties serverConfig() {
                         Properties props = clusterConfig.serverProperties();
-                        clusterConfig.ibp().ifPresent(ibp -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), ibp));
+                        clusterConfig.metadataVersion().ifPresent(mv -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), mv.version()));
                         return props;
                     }
 
@@ -237,6 +239,14 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
                 .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
         }
 
+        @Override
+        public Map<Integer, BrokerFeatures> brokerFeatures() {
+            return servers().collect(Collectors.toMap(
+                brokerServer -> brokerServer.config().nodeId(),
+                KafkaServer::brokerFeatures
+            ));
+        }
+
         @Override
         public ClusterType clusterType() {
             return ClusterType.ZK;
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 1372006f19..1924579e17 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -34,10 +34,12 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.BootstrapMetadata;
 import org.apache.kafka.controller.Controller;
 import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.raft.RaftConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -173,6 +175,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     String threadNamePrefix = String.format("controller%d_", node.id());
                     MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id());
                     TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
+                    BootstrapMetadata bootstrapMetadata = BootstrapMetadata.create(nodes.bootstrapMetadataVersion());
                     KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
                         metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
                         Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
@@ -184,7 +187,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         new Metrics(),
                         Option.apply(threadNamePrefix),
                         connectFutureManager.future,
-                        KafkaRaftServer.configSchema()
+                        KafkaRaftServer.configSchema(),
+                        raftManager.apiVersions(),
+                        bootstrapMetadata
                     );
                     controllers.put(node.id(), controller);
                     controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
@@ -335,6 +340,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     StorageTool.formatCommand(out,
                             JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
                             properties,
+                            MetadataVersion.IBP_3_0_IV0,
                             false);
                 } finally {
                     for (String line : stream.toString().split(String.format("%n"))) {
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java
index d52b800233..f91e62d179 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -20,6 +20,7 @@ package kafka.testkit;
 import kafka.server.MetaProperties;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -33,6 +34,7 @@ import java.util.TreeMap;
 public class TestKitNodes {
     public static class Builder {
         private Uuid clusterId = null;
+        private MetadataVersion bootstrapMetadataVersion = null;
         private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
         private final NavigableMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
 
@@ -41,6 +43,11 @@ public class TestKitNodes {
             return this;
         }
 
+        public Builder setBootstrapMetadataVersion(MetadataVersion metadataVersion) {
+            this.bootstrapMetadataVersion = metadataVersion;
+            return this;
+        }
+
         public Builder addNodes(TestKitNode[] nodes) {
             for (TestKitNode node : nodes) {
                 addNode(node);
@@ -103,18 +110,24 @@ public class TestKitNodes {
             if (clusterId == null) {
                 clusterId = Uuid.randomUuid();
             }
-            return new TestKitNodes(clusterId, controllerNodes, brokerNodes);
+            if (bootstrapMetadataVersion == null) {
+                bootstrapMetadataVersion = MetadataVersion.latest();
+            }
+            return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes);
         }
     }
 
     private final Uuid clusterId;
+    private final MetadataVersion bootstrapMetadataVersion;
     private final NavigableMap<Integer, ControllerNode> controllerNodes;
     private final NavigableMap<Integer, BrokerNode> brokerNodes;
 
     private TestKitNodes(Uuid clusterId,
+                         MetadataVersion bootstrapMetadataVersion,
                          NavigableMap<Integer, ControllerNode> controllerNodes,
                          NavigableMap<Integer, BrokerNode> brokerNodes) {
         this.clusterId = clusterId;
+        this.bootstrapMetadataVersion = bootstrapMetadataVersion;
         this.controllerNodes = controllerNodes;
         this.brokerNodes = brokerNodes;
     }
@@ -123,6 +136,10 @@ public class TestKitNodes {
         return clusterId;
     }
 
+    public MetadataVersion bootstrapMetadataVersion() {
+        return bootstrapMetadataVersion;
+    }
+
     public Map<Integer, ControllerNode> controllerNodes() {
         return controllerNodes;
     }
@@ -161,7 +178,7 @@ public class TestKitNodes {
                 node.incarnationId(), absolutize(baseDirectory, node.metadataDirectory()),
                 absolutize(baseDirectory, node.logDataDirectories()), node.propertyOverrides()));
         }
-        return new TestKitNodes(clusterId, newControllerNodes, newBrokerNodes);
+        return new TestKitNodes(clusterId, bootstrapMetadataVersion, newControllerNodes, newBrokerNodes);
     }
 
     private static List<String> absolutize(String base, Collection<String> directories) {
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index be9f159b86..3b97ee8398 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.message.InitProducerIdRequestData
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse}
+import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
@@ -43,9 +44,9 @@ class ProducerIdsIntegrationTest {
   }
 
   @ClusterTests(Array(
-    new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "2.8"),
-    new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0"),
-    new ClusterTest(clusterType = Type.KRAFT, brokers = 3, ibp = "3.0-IV0")
+    new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
+    new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
+    new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0)
   ))
   def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
     verifyUniqueIds(clusterInstance)
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index c62dbd5284..69688da4f4 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -30,18 +30,19 @@ import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, Descr
 import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Tag, Test, Timeout}
+
 import java.util
-import java.util.concurrent.ExecutionException
 import java.util.{Arrays, Collections, Optional}
-
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type
 import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.server.common.MetadataVersion
 import org.slf4j.LoggerFactory
 
 import scala.annotation.nowarn
 import scala.collection.mutable
+import scala.concurrent.ExecutionException
 import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
 
@@ -69,8 +70,8 @@ class KRaftClusterTest {
   def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
       new TestKitNodes.Builder().
-        setNumBrokerNodes(3).
-        setNumControllerNodes(3).build()).build()
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
     try {
       cluster.format()
       cluster.startup()
@@ -292,6 +293,17 @@ class KRaftClusterTest {
     }
   }
 
+  @Test
+  def testCreateClusterInvalidMetadataVersion(): Unit = {
+    assertThrows(classOf[IllegalArgumentException], () => {
+      new KafkaClusterTestKit.Builder(
+        new TestKitNodes.Builder().
+          setBootstrapMetadataVersion(MetadataVersion.IBP_2_7_IV0).
+          setNumBrokerNodes(1).
+          setNumControllerNodes(1).build()).build()
+    })
+  }
+
   private def doOnStartedKafkaCluster(numControllerNodes: Int = 1,
                                       numBrokerNodes: Int,
                                       brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String, String])
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
new file mode 100644
index 0000000000..b671e5d5e3
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTests, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
+import org.apache.kafka.clients.admin.{FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.server.common.MetadataVersion
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class MetadataVersionIntegrationTest {
+  @ClusterTests(value = Array(
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV0),
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_1_IV0),
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_2_IV0)
+  ))
+  def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = {
+    val admin = clusterInstance.createAdminClient()
+    val describeResult = admin.describeFeatures()
+    val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
+    assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
+    assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
+
+    // Update to new version
+    val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
+    val updateResult = admin.updateFeatures(
+      Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
+    updateResult.all().get()
+
+    // Verify that new version is visible on broker
+    TestUtils.waitUntilTrue(() => {
+      val describeResult2 = admin.describeFeatures()
+      val ff2 = describeResult2.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
+      ff2.minVersionLevel() == updateVersion && ff2.maxVersionLevel() == updateVersion
+    }, "Never saw metadata.version increase on broker")
+  }
+
+  @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0)
+  def testUpgradeSameVersion(clusterInstance: ClusterInstance): Unit = {
+    val admin = clusterInstance.createAdminClient()
+    val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
+    val updateResult = admin.updateFeatures(
+      Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
+    updateResult.all().get()
+  }
+
+  @ClusterTest(clusterType = Type.KRAFT)
+  def testDefaultIsLatestVersion(clusterInstance: ClusterInstance): Unit = {
+    val admin = clusterInstance.createAdminClient()
+    val describeResult = admin.describeFeatures()
+    val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
+    assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel())
+    assertEquals(ff.maxVersionLevel(), MetadataVersion.latest().featureLevel())
+  }
+}
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 9f00c0564c..b4ec1dbc87 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -22,7 +22,6 @@ import java.net.InetSocketAddress
 import java.util
 import java.util.{Collections, Properties}
 import java.util.concurrent.CompletableFuture
-
 import javax.security.auth.login.Configuration
 import kafka.raft.KafkaRaftManager
 import kafka.tools.StorageTool
@@ -33,15 +32,18 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{Exit, Time}
+import org.apache.kafka.controller.BootstrapMetadata
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo}
 
+import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, immutable}
+import scala.jdk.CollectionConverters._
 
 trait QuorumImplementation {
   def createBroker(config: KafkaConfig,
@@ -114,6 +116,10 @@ abstract class QuorumTestHarness extends Logging {
     Seq(new Properties())
   }
 
+  protected def metadataVersion: MetadataVersion = MetadataVersion.latest()
+
+  val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()
+
   private var implementation: QuorumImplementation = null
 
   def isKRaftTest(): Boolean = implementation.isInstanceOf[KRaftQuorumImplementation]
@@ -227,7 +233,7 @@ abstract class QuorumTestHarness extends Logging {
     var out: PrintStream = null
     try {
       out = new PrintStream(stream)
-      if (StorageTool.formatCommand(out, directories, metaProperties, false) != 0) {
+      if (StorageTool.formatCommand(out, directories, metaProperties, metadataVersion, ignoreFormatted = false) != 0) {
         throw new RuntimeException(stream.toString())
       }
       debug(s"Formatted storage directory(ies) ${directories}")
@@ -282,6 +288,8 @@ abstract class QuorumTestHarness extends Logging {
         threadNamePrefix = Option(threadNamePrefix),
         controllerQuorumVotersFuture = controllerQuorumVotersFuture,
         configSchema = KafkaRaftServer.configSchema,
+        raftApiVersions = raftManager.apiVersions,
+        bootstrapMetadata = BootstrapMetadata.create(metadataVersion, bootstrapRecords.asJava),
       )
       controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
         if (e != null) {
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index f237335bac..a77b42e46e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -163,7 +163,7 @@ class ControllerChannelManagerTest {
   def testLeaderAndIsrInterBrokerProtocolVersion(): Unit = {
     testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.LEADER_AND_ISR.latestVersion)
 
-    for (metadataVersion <- MetadataVersion.VALUES) {
+    for (metadataVersion <- MetadataVersion.VERSIONS) {
       val leaderAndIsrRequestVersion: Short =
         if (metadataVersion.isAtLeast(IBP_3_2_IV0)) 6
         else if (metadataVersion.isAtLeast(IBP_2_8_IV1)) 5
@@ -380,7 +380,7 @@ class ControllerChannelManagerTest {
   def testUpdateMetadataInterBrokerProtocolVersion(): Unit = {
     testUpdateMetadataFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.UPDATE_METADATA.latestVersion)
 
-    for (metadataVersion <- MetadataVersion.VALUES) {
+    for (metadataVersion <- MetadataVersion.VERSIONS) {
       val updateMetadataRequestVersion: Short =
         if (metadataVersion.isAtLeast(IBP_2_8_IV1)) 7
         else if (metadataVersion.isAtLeast(IBP_2_4_IV1)) 6
@@ -474,7 +474,7 @@ class ControllerChannelManagerTest {
 
   @Test
   def testStopReplicaRequestsWhileTopicQueuedForDeletion(): Unit = {
-    for (metadataVersion <- MetadataVersion.VALUES) {
+    for (metadataVersion <- MetadataVersion.VERSIONS) {
       testStopReplicaRequestsWhileTopicQueuedForDeletion(metadataVersion)
     }
   }
@@ -521,7 +521,7 @@ class ControllerChannelManagerTest {
 
   @Test
   def testStopReplicaRequestsWhileTopicDeletionStarted(): Unit = {
-    for (metadataVersion <- MetadataVersion.VALUES) {
+    for (metadataVersion <- MetadataVersion.VERSIONS) {
       testStopReplicaRequestsWhileTopicDeletionStarted(metadataVersion)
     }
   }
@@ -576,7 +576,7 @@ class ControllerChannelManagerTest {
 
   @Test
   def testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(): Unit = {
-    for (metadataVersion <- MetadataVersion.VALUES) {
+    for (metadataVersion <- MetadataVersion.VERSIONS) {
       testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(metadataVersion)
     }
   }
@@ -626,7 +626,7 @@ class ControllerChannelManagerTest {
     testMixedDeleteAndNotDeleteStopReplicaRequests(MetadataVersion.latest,
       ApiKeys.STOP_REPLICA.latestVersion)
 
-    for (metadataVersion <- MetadataVersion.VALUES) {
+    for (metadataVersion <- MetadataVersion.VERSIONS) {
       if (metadataVersion.isLessThan(IBP_2_2_IV0))
         testMixedDeleteAndNotDeleteStopReplicaRequests(metadataVersion, 0.toShort)
       else if (metadataVersion.isLessThan(IBP_2_4_IV1))
@@ -775,7 +775,7 @@ class ControllerChannelManagerTest {
   def testStopReplicaInterBrokerProtocolVersion(): Unit = {
     testStopReplicaFollowsInterBrokerProtocolVersion(MetadataVersion.latest, ApiKeys.STOP_REPLICA.latestVersion)
 
-    for (metadataVersion <- MetadataVersion.VALUES) {
+    for (metadataVersion <- MetadataVersion.VERSIONS) {
       if (metadataVersion.isLessThan(IBP_2_2_IV0))
         testStopReplicaFollowsInterBrokerProtocolVersion(metadataVersion, 0.toShort)
       else if (metadataVersion.isLessThan(IBP_2_4_IV1))
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 250f22a24f..688d6e83b0 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1055,7 +1055,7 @@ class GroupMetadataManagerTest {
     val protocol = "range"
     val memberId = "memberId"
 
-    for (metadataVersion <- MetadataVersion.VALUES) {
+    for (metadataVersion <- MetadataVersion.VERSIONS) {
       val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, metadataVersion = metadataVersion)
 
       val deserializedGroupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecord.value(), time)
@@ -2276,7 +2276,7 @@ class GroupMetadataManagerTest {
       assertEquals(expectedLeaderEpoch, deserializedOffsetAndMetadata.leaderEpoch)
     }
 
-    for (version <- MetadataVersion.VALUES) {
+    for (version <- MetadataVersion.VERSIONS) {
       val expectedSchemaVersion = version match {
         case v if v.isLessThan(IBP_2_1_IV0) => 1
         case v if v.isLessThan(IBP_2_1_IV1) => 2
@@ -2307,7 +2307,7 @@ class GroupMetadataManagerTest {
       assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
     }
 
-    for (version <- MetadataVersion.VALUES)
+    for (version <- MetadataVersion.VERSIONS)
       verifySerde(version)
   }
 
@@ -2335,7 +2335,7 @@ class GroupMetadataManagerTest {
       assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
     }
 
-    for (version <- MetadataVersion.VALUES)
+    for (version <- MetadataVersion.VERSIONS)
       verifySerde(version)
   }
 
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index a7a9519455..f8fac503d6 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -18,7 +18,6 @@ package kafka.raft
 
 import java.util.concurrent.CompletableFuture
 import java.util.Properties
-
 import kafka.raft.KafkaRaftManager.RaftIoThread
 import kafka.server.{KafkaConfig, MetaProperties}
 import kafka.tools.TestRaftServer.ByteArraySerde
diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 530bc235b3..99d593ede6 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -77,7 +77,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
       ApiVersionsResponse.intersectForwardableApis(
         ApiMessageType.ListenerType.BROKER,
         RecordVersion.current,
-        new NodeApiVersions(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions()
+        NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions()
       )
     }
 
diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
index 10d69e2cd6..eab3928483 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
@@ -27,7 +27,7 @@ class BrokerFeaturesTest {
 
   @Test
   def testEmpty(): Unit = {
-    assertTrue(BrokerFeatures.createDefault().supportedFeatures.empty)
+    assertTrue(BrokerFeatures.createEmpty().supportedFeatures.empty)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index dd3e49d4d1..1a0fac443c 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -73,7 +73,7 @@ class BrokerLifecycleManagerTest {
     val metadata = new Metadata(1000, 1000, new LogContext(), new ClusterResourceListeners())
     val mockClient = new MockClient(time, metadata)
     val controllerNodeProvider = new SimpleControllerNodeProvider()
-    val nodeApiVersions = new NodeApiVersions(Seq(BROKER_REGISTRATION, BROKER_HEARTBEAT).map {
+    val nodeApiVersions = NodeApiVersions.create(Seq(BROKER_REGISTRATION, BROKER_HEARTBEAT).map {
       apiKey => new ApiVersion().setApiKey(apiKey.id).
         setMinVersion(apiKey.oldestVersion()).setMaxVersion(apiKey.latestVersion())
     }.toList.asJava)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e879a7f6ff..36dff7a679 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -698,8 +698,8 @@ class KafkaConfigTest {
       KafkaConfig.fromProps(props)
     }
 
-    MetadataVersion.VALUES.foreach { interBrokerVersion =>
-      MetadataVersion.VALUES.foreach { messageFormatVersion =>
+    MetadataVersion.VERSIONS.foreach { interBrokerVersion =>
+      MetadataVersion.VERSIONS.foreach { messageFormatVersion =>
         if (interBrokerVersion.highestSupportedRecordVersion.value >= messageFormatVersion.highestSupportedRecordVersion.value) {
           val config = buildConfig(interBrokerVersion, messageFormatVersion)
           assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index b4dac22c6c..f997455f0b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -19,11 +19,11 @@ package kafka.server
 import java.io.File
 import java.nio.file.Files
 import java.util.Properties
-
 import kafka.common.{InconsistentBrokerMetadataException, InconsistentNodeIdException}
 import kafka.log.UnifiedLog
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.controller.BootstrapMetadata
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -44,7 +44,7 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
     configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
 
-    val (loadedMetaProperties, offlineDirs) =
+    val (loadedMetaProperties, _, offlineDirs) =
       invokeLoadMetaProperties(metaProperties, configProperties)
 
     assertEquals(metaProperties, loadedMetaProperties)
@@ -72,7 +72,7 @@ class KafkaRaftServerTest {
   private def invokeLoadMetaProperties(
     metaProperties: MetaProperties,
     configProperties: Properties
-  ): (MetaProperties, collection.Seq[String]) = {
+  ): (MetaProperties, BootstrapMetadata, collection.Seq[String]) = {
     val tempLogDir = TestUtils.tempDirectory()
     try {
       writeMetaProperties(tempLogDir, metaProperties)
@@ -159,7 +159,7 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     val config = KafkaConfig.fromProps(configProperties)
 
-    val (loadedProperties, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
+    val (loadedProperties, _, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
     assertEquals(nodeId, loadedProperties.nodeId)
     assertEquals(Seq(invalidDir.getAbsolutePath), offlineDirs)
   }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 079c0d5635..c8c7a89df7 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -50,7 +50,7 @@ import scala.collection.Seq
 class EpochDrivenReplicationProtocolAcceptanceTest extends QuorumTestHarness with Logging {
 
   // Set this to IBP_0_11_0_IV1 to demonstrate the tests failing in the pre-KIP-101 case
-  val metadataVersion = MetadataVersion.latest
+  override def metadataVersion = MetadataVersion.latest
   val topic = "topic1"
   val msg = new Array[Byte](1000)
   val msgBigger = new Array[Byte](10000)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 0242c33dab..9f75031341 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -26,6 +26,7 @@ import java.util.Properties
 import kafka.server.{KafkaConfig, MetaProperties}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.{Test, Timeout}
 
@@ -160,11 +161,11 @@ Found problem:
         clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
       val stream = new ByteArrayOutputStream()
       assertEquals(0, StorageTool.
-        formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, false))
+        formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = false))
       assertEquals("Formatting %s%n".format(tempDir), stream.toString())
 
       try assertEquals(1, StorageTool.
-        formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, false)) catch {
+        formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = false)) catch {
         case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " +
           "formatted. Use --ignore-formatted to ignore this directory and format the " +
           "others.", e.getMessage)
@@ -172,7 +173,7 @@ Found problem:
 
       val stream2 = new ByteArrayOutputStream()
       assertEquals(0, StorageTool.
-        formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, true))
+        formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, MetadataVersion.latest(), ignoreFormatted = true))
       assertEquals("All of the log directories are already formatted.%n".format(), stream2.toString())
     } finally Utils.delete(tempDir)
   }
@@ -185,4 +186,19 @@ Found problem:
         "16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure],
           () => StorageTool.buildMetadataProperties("invalid", config)).getMessage)
   }
+
+  @Test
+  def testDefaultMetadataVersion(): Unit = {
+    var namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
+    var mv = StorageTool.getMetadataVersion(namespace)
+    assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(),
+      "Expected the default metadata.version to be the latest version")
+
+    namespace = StorageTool.parseArguments(Array("format", "-c", "config.props",
+      "--metadata-version", MetadataVersion.latest().featureLevel().toString, "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
+    mv = StorageTool.getMetadataVersion(namespace)
+    assertEquals(MetadataVersion.latest().featureLevel(), mv.featureLevel(),
+      "Expected the default metadata.version to be the latest version")
+
+  }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
new file mode 100644
index 0000000000..fa031c525f
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.metadata.util.SnapshotFileReader;
+import org.apache.kafka.metadata.util.SnapshotFileWriter;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+
+
+/**
+ * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the
+ * format is the same as a KRaft snapshot.
+ */
+public class BootstrapMetadata {
+    private static final Logger log = LoggerFactory.getLogger(BootstrapMetadata.class);
+
+    public static final String BOOTSTRAP_FILE = "bootstrap.checkpoint";
+
+    private final MetadataVersion metadataVersion;
+
+    private final List<ApiMessageAndVersion> records;
+
+    BootstrapMetadata(MetadataVersion metadataVersion, List<ApiMessageAndVersion> records) {
+        this.metadataVersion = metadataVersion;
+        this.records = Collections.unmodifiableList(records);
+    }
+
+    public MetadataVersion metadataVersion() {
+        return this.metadataVersion;
+    }
+
+    public List<ApiMessageAndVersion> records() {
+        return records;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        BootstrapMetadata metadata = (BootstrapMetadata) o;
+        return metadataVersion == metadata.metadataVersion;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(metadataVersion);
+    }
+
+    @Override
+    public String toString() {
+        return "BootstrapMetadata{" +
+            "metadataVersion=" + metadataVersion +
+            '}';
+    }
+
+    /**
+     * A raft client listener that simply collects all of the commits and snapshots into a mapping of
+     * metadata record type to list of records.
+     */
+    private static class BootstrapListener implements RaftClient.Listener<ApiMessageAndVersion> {
+        private final List<ApiMessageAndVersion> records = new ArrayList<>();
+
+        @Override
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            try {
+                while (reader.hasNext()) {
+                    Batch<ApiMessageAndVersion> batch = reader.next();
+                    records.addAll(batch.records());
+                }
+            } finally {
+                reader.close();
+            }
+        }
+
+        @Override
+        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+            try {
+                while (reader.hasNext()) {
+                    Batch<ApiMessageAndVersion> batch = reader.next();
+                    for (ApiMessageAndVersion messageAndVersion : batch) {
+                        records.add(messageAndVersion);
+                    }
+                }
+            } finally {
+                reader.close();
+            }
+        }
+    }
+
+    public static BootstrapMetadata create(MetadataVersion metadataVersion) {
+        return create(metadataVersion, new ArrayList<>());
+    }
+
+    public static BootstrapMetadata create(MetadataVersion metadataVersion, List<ApiMessageAndVersion> records) {
+        if (!metadataVersion.isKRaftSupported()) {
+            throw new IllegalArgumentException("Cannot create BootstrapMetadata with a non-KRaft metadata version.");
+        }
+        records.add(new ApiMessageAndVersion(
+            new FeatureLevelRecord()
+                .setName(MetadataVersion.FEATURE_NAME)
+                .setFeatureLevel(metadataVersion.featureLevel()),
+            FeatureLevelRecord.LOWEST_SUPPORTED_VERSION));
+
+        return new BootstrapMetadata(metadataVersion, records);
+    }
+
+    /**
+     * Load a bootstrap snapshot into a read-only bootstrap metadata object and return it.
+     *
+     * @param bootstrapDir  The directory from which to read the snapshot file.
+     * @param fallbackPreviewVersion    The metadata.version to boostrap if upgrading from KRaft preview
+     * @return              The read-only bootstrap metadata
+     * @throws Exception
+     */
+    public static BootstrapMetadata load(Path bootstrapDir, MetadataVersion fallbackPreviewVersion) throws Exception {
+        final Path bootstrapPath = bootstrapDir.resolve(BOOTSTRAP_FILE);
+
+        if (!Files.exists(bootstrapPath)) {
+            log.debug("Missing bootstrap file, this appears to be a KRaft preview cluster. Setting metadata.version to {}.",
+                fallbackPreviewVersion.featureLevel());
+            return BootstrapMetadata.create(fallbackPreviewVersion);
+        }
+
+        BootstrapListener listener = new BootstrapListener();
+        try (SnapshotFileReader reader = new SnapshotFileReader(bootstrapPath.toString(), listener)) {
+            reader.startup();
+            reader.caughtUpFuture().get();
+        } catch (ExecutionException e) {
+            throw new Exception("Failed to load snapshot", e.getCause());
+        }
+
+        Optional<FeatureLevelRecord> metadataVersionRecord = listener.records.stream()
+            .flatMap(message -> {
+                MetadataRecordType type = MetadataRecordType.fromId(message.message().apiKey());
+                if (!type.equals(MetadataRecordType.FEATURE_LEVEL_RECORD)) {
+                    return Stream.empty();
+                }
+                FeatureLevelRecord record = (FeatureLevelRecord) message.message();
+                if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+                    return Stream.of(record);
+                } else {
+                    return Stream.empty();
+                }
+            })
+            .findFirst();
+
+        if (metadataVersionRecord.isPresent()) {
+            return new BootstrapMetadata(MetadataVersion.fromFeatureLevel(metadataVersionRecord.get().featureLevel()), listener.records);
+        } else {
+            throw new RuntimeException("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
+        }
+    }
+
+    /**
+     * Write a set of bootstrap metadata to the bootstrap snapshot in a given directory
+     *
+     * @param metadata      The metadata to persist
+     * @param bootstrapDir  The directory in which to create the bootstrap snapshot
+     * @throws IOException
+     */
+    public static void write(BootstrapMetadata metadata, Path bootstrapDir) throws IOException {
+        final Path bootstrapPath = bootstrapDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE);
+        if (Files.exists(bootstrapPath)) {
+            throw new IOException("Cannot write metadata bootstrap file " + bootstrapPath +
+                ". File already already exists.");
+        }
+        try (SnapshotFileWriter bootstrapWriter = SnapshotFileWriter.open(bootstrapPath)) {
+            bootstrapWriter.append(metadata.records());
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 538e8a13eb..90dfb571b1 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -336,8 +336,7 @@ public class ClusterControlManager {
         heartbeatManager.register(brokerId, record.fenced());
 
         List<ApiMessageAndVersion> records = new ArrayList<>();
-        records.add(new ApiMessageAndVersion(record,
-            REGISTER_BROKER_RECORD.highestSupportedVersion()));
+        records.add(new ApiMessageAndVersion(record, REGISTER_BROKER_RECORD.highestSupportedVersion()));
         return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
     }
 
@@ -535,8 +534,7 @@ public class ClusterControlManager {
                 setEndPoints(endpoints).
                 setFeatures(features).
                 setRack(registration.rack().orElse(null)).
-                setFenced(registration.fenced()),
-                    REGISTER_BROKER_RECORD.highestSupportedVersion()));
+                setFenced(registration.fenced()), REGISTER_BROKER_RECORD.highestSupportedVersion()));
             return batch;
         }
     }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 307a0ce09d..52d3c5d521 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -25,7 +25,9 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.TreeMap;
+import java.util.function.Consumer;
 
 import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
@@ -33,10 +35,12 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineObject;
 import org.slf4j.Logger;
 
 import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
@@ -55,6 +59,11 @@ public class FeatureControlManager {
      */
     private final TimelineHashMap<String, Short> finalizedVersions;
 
+    /**
+     * The current metadata version
+     */
+    private final TimelineObject<MetadataVersion> metadataVersion;
+
 
     FeatureControlManager(LogContext logContext,
                           QuorumFeatures quorumFeatures,
@@ -62,13 +71,15 @@ public class FeatureControlManager {
         this.log = logContext.logger(FeatureControlManager.class);
         this.quorumFeatures = quorumFeatures;
         this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.metadataVersion = new TimelineObject<>(snapshotRegistry, MetadataVersion.UNINITIALIZED);
     }
 
     ControllerResult<Map<String, ApiError>> updateFeatures(
-            Map<String, Short> updates,
-            Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
-            Map<Integer, Map<String, VersionRange>> brokerFeatures,
-            boolean validateOnly) {
+        Map<String, Short> updates,
+        Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
+        Map<Integer, Map<String, VersionRange>> brokerFeatures,
+        boolean validateOnly
+    ) {
         TreeMap<String, ApiError> results = new TreeMap<>();
         List<ApiMessageAndVersion> records = new ArrayList<>();
         for (Entry<String, Short> entry : updates.entrySet()) {
@@ -83,9 +94,28 @@ public class FeatureControlManager {
         }
     }
 
-    boolean canSupportVersion(String featureName, short versionRange) {
-        return quorumFeatures.localSupportedFeature(featureName)
-            .filter(localRange -> localRange.contains(versionRange))
+    ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short initVersion) {
+        if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+            return ControllerResult.atomicOf(
+                Collections.emptyList(),
+                Collections.singletonMap(
+                    MetadataVersion.FEATURE_NAME,
+                    new ApiError(Errors.INVALID_UPDATE_VERSION,
+                        "Cannot initialize metadata.version to " + initVersion + " since it has already been " +
+                            "initialized to " + metadataVersion().featureLevel() + ".")
+            ));
+        }
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        ApiError result = updateMetadataVersion(initVersion, false, records::add);
+        return ControllerResult.atomicOf(records, Collections.singletonMap(MetadataVersion.FEATURE_NAME, result));
+    }
+
+    /**
+     * Test if the quorum can support this feature and version
+     */
+    boolean canSupportVersion(String featureName, short version) {
+        return quorumFeatures.quorumSupportedFeature(featureName)
+            .filter(versionRange -> versionRange.contains(version))
             .isPresent();
     }
 
@@ -93,84 +123,189 @@ public class FeatureControlManager {
         return quorumFeatures.localSupportedFeature(featureName).isPresent();
     }
 
-    private ApiError updateFeature(String featureName,
-                                   short newVersion,
-                                   FeatureUpdate.UpgradeType upgradeType,
-                                   Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
-                                   List<ApiMessageAndVersion> records) {
+    MetadataVersion metadataVersion() {
+        return metadataVersion.get();
+    }
+
+    private ApiError updateFeature(
+        String featureName,
+        short newVersion,
+        FeatureUpdate.UpgradeType upgradeType,
+        Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
+        List<ApiMessageAndVersion> records
+    ) {
         if (!featureExists(featureName)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given feature.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature.");
         }
 
         if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given upgrade type.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given upgrade type.");
         }
 
-        final Short currentVersion = finalizedVersions.get(featureName);
+        final Short currentVersion;
+        if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+            currentVersion = metadataVersion.get().featureLevel();
+        } else {
+            currentVersion = finalizedVersions.get(featureName);
+        }
 
         if (newVersion <= 0) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The upper value for the new range cannot be less than 1.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "A feature version cannot be less than 1.");
         }
 
         if (!canSupportVersion(featureName, newVersion)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The controller does not support the given feature range.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The quorum does not support the given feature version.");
         }
 
         for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) {
             VersionRange brokerRange = brokerEntry.getValue().get(featureName);
-            if (brokerRange == null || !brokerRange.contains(newVersion)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
+            if (brokerRange == null) {
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Broker " + brokerEntry.getKey() + " does not support this feature.");
+            } else if (!brokerRange.contains(newVersion)) {
+                return invalidUpdateVersion(featureName, newVersion,
                     "Broker " + brokerEntry.getKey() + " does not support the given " +
-                        "feature range.");
+                    "version. It supports " + brokerRange.min() + " to " + brokerRange.max() + ".");
             }
         }
 
         if (currentVersion != null && newVersion < currentVersion) {
             if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade.");
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Can't downgrade the version of this feature without setting the " +
+                    "upgrade type to either safe or unsafe downgrade.");
             }
         }
 
-        records.add(new ApiMessageAndVersion(
+        if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+            // Perform additional checks if we're updating metadata.version
+            return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
+        } else {
+            records.add(new ApiMessageAndVersion(
+                new FeatureLevelRecord()
+                    .setName(featureName)
+                    .setFeatureLevel(newVersion),
+                FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            return ApiError.NONE;
+        }
+    }
+
+    private ApiError invalidUpdateVersion(String feature, short version, String message) {
+        String errorMessage = String.format("Invalid update version %d for feature %s. %s", version, feature, message);
+        log.debug(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    /**
+     * Perform some additional validation for metadata.version updates.
+     */
+    private ApiError updateMetadataVersion(
+        short newVersionLevel,
+        boolean allowUnsafeDowngrade,
+        Consumer<ApiMessageAndVersion> recordConsumer
+    ) {
+        Optional<VersionRange> quorumSupported = quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
+        if (!quorumSupported.isPresent()) {
+            return invalidMetadataVersion(newVersionLevel, "The quorum does not support metadata.version.");
+        }
+
+        if (newVersionLevel <= 0) {
+            return invalidMetadataVersion(newVersionLevel, "KRaft mode/the quorum does not support metadata.version values less than 1.");
+        }
+
+        if (!quorumSupported.get().contains(newVersionLevel)) {
+            return invalidMetadataVersion(newVersionLevel, "The controller quorum does support this version.");
+        }
+
+        MetadataVersion currentVersion = metadataVersion();
+        final MetadataVersion newVersion;
+        try {
+            newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
+        } catch (IllegalArgumentException e) {
+            return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
+        }
+
+        if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && newVersion.isLessThan(currentVersion)) {
+            // This is a downgrade
+            boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
+            if (!metadataChanged) {
+                log.info("Downgrading metadata.version from {} to {}.", currentVersion, newVersion);
+            } else {
+                return invalidMetadataVersion(newVersionLevel, "Unsafe metadata.version downgrades are not supported.");
+            }
+        }
+
+        recordConsumer.accept(new ApiMessageAndVersion(
             new FeatureLevelRecord()
-                .setName(featureName)
-                .setFeatureLevel(newVersion),
-            FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+                .setName(MetadataVersion.FEATURE_NAME)
+                .setFeatureLevel(newVersionLevel), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
         return ApiError.NONE;
     }
 
-    FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) {
+    private ApiError invalidMetadataVersion(short version, String message) {
+        String errorMessage = String.format("Invalid metadata.version %d. %s", version, message);
+        log.error(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    FinalizedControllerFeatures finalizedFeatures(long epoch) {
         Map<String, Short> features = new HashMap<>();
-        for (Entry<String, Short> entry : finalizedVersions.entrySet(lastCommittedOffset)) {
+        if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) {
+            features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel());
+        }
+        for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
             features.put(entry.getKey(), entry.getValue());
         }
-        return new FinalizedControllerFeatures(features, lastCommittedOffset);
+        return new FinalizedControllerFeatures(features, epoch);
     }
 
     public void replay(FeatureLevelRecord record) {
-        log.info("Setting feature {} to {}", record.name(), record.featureLevel());
-        finalizedVersions.put(record.name(), record.featureLevel());
+        if (!canSupportVersion(record.name(), record.featureLevel())) {
+            throw new RuntimeException("Controller cannot support feature " + record.name() +
+                                       " at version " + record.featureLevel());
+        }
+
+        if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+            log.info("Setting metadata.version to {}", record.featureLevel());
+            metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
+        } else {
+            log.info("Setting feature {} to {}", record.name(), record.featureLevel());
+            finalizedVersions.put(record.name(), record.featureLevel());
+        }
     }
 
     class FeatureControlIterator implements Iterator<List<ApiMessageAndVersion>> {
         private final Iterator<Entry<String, Short>> iterator;
+        private final MetadataVersion metadataVersion;
+        private boolean wroteVersion = false;
 
         FeatureControlIterator(long epoch) {
             this.iterator = finalizedVersions.entrySet(epoch).iterator();
+            this.metadataVersion = FeatureControlManager.this.metadataVersion.get(epoch);
+            if (this.metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
+                this.wroteVersion = true;
+            }
         }
 
         @Override
         public boolean hasNext() {
-            return iterator.hasNext();
+            return !wroteVersion || iterator.hasNext();
         }
 
         @Override
         public List<ApiMessageAndVersion> next() {
+            // Write the metadata.version first
+            if (!wroteVersion) {
+                wroteVersion = true;
+                return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
+                    .setName(MetadataVersion.FEATURE_NAME)
+                    .setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+            }
+            // Then write the rest of the features
             if (!hasNext()) throw new NoSuchElementException();
             Entry<String, Short> entry = iterator.next();
             return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index cea62c8f00..78cebd892c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -88,6 +88,7 @@ import org.apache.kafka.raft.BatchReader;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.snapshot.SnapshotReader;
@@ -134,6 +135,10 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
  * the fact that the controller may have several operations in progress at any given
  * point.  The future associated with each operation will not be completed until the
  * results of the operation have been made durable to the metadata log.
+ *
+ * The QuorumController uses the "metadata.version" feature flag as a mechanism to control
+ * the usage of new log record schemas. Starting with 3.3, this version must be set before
+ * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
     /**
@@ -161,6 +166,7 @@ public final class QuorumController implements Controller {
         private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
         private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty();
         private Map<String, Object> staticConfig = Collections.emptyMap();
+        private BootstrapMetadata bootstrapMetadata = null;
 
         public Builder(int nodeId, String clusterId) {
             this.nodeId = nodeId;
@@ -241,6 +247,11 @@ public final class QuorumController implements Controller {
             return this;
         }
 
+        public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
+            this.bootstrapMetadata = bootstrapMetadata;
+            return this;
+        }
+
         public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
             this.createTopicPolicy = createTopicPolicy;
             return this;
@@ -271,6 +282,9 @@ public final class QuorumController implements Controller {
             if (raftClient == null) {
                 throw new RuntimeException("You must set a raft client.");
             }
+            if (bootstrapMetadata == null || bootstrapMetadata.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+                throw new RuntimeException("You must specify an initial metadata.version using the kafka-storage tool.");
+            }
             if (quorumFeatures == null) {
                 throw new RuntimeException("You must specify the quorum features");
             }
@@ -293,7 +307,7 @@ public final class QuorumController implements Controller {
                     defaultNumPartitions, isLeaderRecoverySupported, replicaPlacer, snapshotMaxNewRecordBytes,
                     leaderImbalanceCheckIntervalNs, sessionTimeoutNs, controllerMetrics,
                     createTopicPolicy, alterConfigPolicy, configurationValidator, authorizer,
-                    staticConfig);
+                    staticConfig, bootstrapMetadata);
             } catch (Exception e) {
                 Utils.closeQuietly(queue, "event queue");
                 throw e;
@@ -886,16 +900,58 @@ public final class QuorumController implements Controller {
                             newEpoch + ", but we never renounced controller epoch " +
                             curEpoch);
                     }
-                    log.info(
-                        "Becoming the active controller at epoch {}, committed offset {} and committed epoch {}.",
-                        newEpoch, lastCommittedOffset, lastCommittedEpoch
-                    );
+
 
                     curClaimEpoch = newEpoch;
                     controllerMetrics.setActive(true);
                     writeOffset = lastCommittedOffset;
                     clusterControl.activate();
 
+                    // Check if we need to bootstrap metadata into the log. This must happen before we can
+                    // write any other records to the log since we need the metadata.version to determine the correct
+                    // record version
+                    final MetadataVersion metadataVersion;
+                    if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+                        final CompletableFuture<Map<String, ApiError>> future;
+                        if (!bootstrapMetadata.metadataVersion().isKRaftSupported()) {
+                            metadataVersion = MetadataVersion.UNINITIALIZED;
+                            future = new CompletableFuture<>();
+                            future.completeExceptionally(
+                                new IllegalStateException("Cannot become leader without an initial metadata.version of " +
+                                    "at least 1. Got " + bootstrapMetadata.metadataVersion().featureLevel()));
+                        } else {
+                            metadataVersion = bootstrapMetadata.metadataVersion();
+                            future = appendWriteEvent("bootstrapMetadata", OptionalLong.empty(), () -> {
+                                if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
+                                    log.info("Initializing metadata.version to {}", metadataVersion.featureLevel());
+                                } else {
+                                    log.info("Upgrading from KRaft preview. Initializing metadata.version to {}",
+                                        metadataVersion.featureLevel());
+                                }
+                                return ControllerResult.atomicOf(bootstrapMetadata.records(), null);
+                            });
+                        }
+                        future.whenComplete((result, exception) -> {
+                            if (exception != null) {
+                                log.error("Failed to bootstrap metadata.", exception);
+                                appendRaftEvent("bootstrapMetadata[" + curClaimEpoch + "]", () -> {
+                                    log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " +
+                                             "metadata. Reverting to last committed offset {}.",
+                                        curClaimEpoch, lastCommittedOffset);
+                                    renounce();
+                                });
+                            }
+                        });
+                    } else {
+                        metadataVersion = featureControl.metadataVersion();
+                    }
+
+
+                    log.info(
+                        "Becoming the active controller at epoch {}, committed offset {}, committed epoch {}, and metadata.version {}",
+                        newEpoch, lastCommittedOffset, lastCommittedEpoch, metadataVersion.featureLevel()
+                    );
+
                     // Before switching to active, create an in-memory snapshot at the last committed offset. This is
                     // required because the active controller assumes that there is always an in-memory snapshot at the
                     // last committed offset.
@@ -908,7 +964,7 @@ public final class QuorumController implements Controller {
             } else if (curClaimEpoch != -1) {
                 appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
                     log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
-                            "log event. Reverting to last committed offset {}.", curClaimEpoch,
+                             "log event. Reverting to last committed offset {}.", curClaimEpoch,
                         lastCommittedOffset);
                     renounce();
                 });
@@ -1343,6 +1399,8 @@ public final class QuorumController implements Controller {
      */
     private ImbalanceSchedule imbalancedScheduled = ImbalanceSchedule.DEFERRED;
 
+    private final BootstrapMetadata bootstrapMetadata;
+
     private QuorumController(LogContext logContext,
                              int nodeId,
                              String clusterId,
@@ -1363,7 +1421,8 @@ public final class QuorumController implements Controller {
                              Optional<AlterConfigPolicy> alterConfigPolicy,
                              ConfigurationValidator configurationValidator,
                              Optional<ClusterMetadataAuthorizer> authorizer,
-                             Map<String, Object> staticConfig) {
+                             Map<String, Object> staticConfig,
+                             BootstrapMetadata bootstrapMetadata) {
         this.logContext = logContext;
         this.log = logContext.logger(QuorumController.class);
         this.nodeId = nodeId;
@@ -1414,6 +1473,7 @@ public final class QuorumController implements Controller {
         authorizer.ifPresent(a -> a.setAclMutator(this));
         this.aclControlManager = new AclControlManager(snapshotRegistry, authorizer);
         this.raftClient = raftClient;
+        this.bootstrapMetadata = bootstrapMetadata;
         this.metaLogListener = new QuorumMetaLogListener();
         this.curClaimEpoch = -1;
         this.writeOffset = -1L;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 0ee27bd4f3..80a5c0a0b1 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -17,32 +17,102 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.Collectors;
 
 /**
- * A holder class of the local node's supported feature flags.
+ * A holder class of the local node's supported feature flags as well as the ApiVersions of other nodes.
  */
 public class QuorumFeatures {
+    private static final Logger log = LoggerFactory.getLogger(QuorumFeatures.class);
+
     private final int nodeId;
+    private final ApiVersions apiVersions;
     private final Map<String, VersionRange> supportedFeatures;
+    private final List<Integer> quorumNodeIds;
 
-    QuorumFeatures(int nodeId,
-                          Map<String, VersionRange> supportedFeatures) {
+    QuorumFeatures(
+        int nodeId,
+        ApiVersions apiVersions,
+        Map<String, VersionRange> supportedFeatures,
+        List<Integer> quorumNodeIds
+    ) {
         this.nodeId = nodeId;
+        this.apiVersions = apiVersions;
         this.supportedFeatures = Collections.unmodifiableMap(supportedFeatures);
+        this.quorumNodeIds = Collections.unmodifiableList(quorumNodeIds);
     }
 
-    public static QuorumFeatures create(int nodeId,
-                                        Map<String, VersionRange> supportedFeatures) {
-        return new QuorumFeatures(nodeId, supportedFeatures);
+    public static QuorumFeatures create(
+        int nodeId,
+        ApiVersions apiVersions,
+        Map<String, VersionRange> supportedFeatures,
+        Collection<Node> quorumNodes
+    ) {
+        List<Integer> nodeIds = quorumNodes.stream().map(Node::id).collect(Collectors.toList());
+        return new QuorumFeatures(nodeId, apiVersions, supportedFeatures, nodeIds);
     }
 
     public static Map<String, VersionRange> defaultFeatureMap() {
-        return Collections.emptyMap();
+        Map<String, VersionRange> features = new HashMap<>(1);
+        features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()));
+        return features;
+    }
+
+    Optional<VersionRange> quorumSupportedFeature(String featureName) {
+        List<VersionRange> supportedVersions = new ArrayList<>(quorumNodeIds.size());
+        for (int nodeId : quorumNodeIds) {
+            if (nodeId == this.nodeId) {
+                // We get this node's features from "supportedFeatures"
+                continue;
+            }
+            NodeApiVersions nodeVersions = apiVersions.get(Integer.toString(nodeId));
+            if (nodeVersions == null) {
+                continue;
+            }
+            SupportedVersionRange supportedRange = nodeVersions.supportedFeatures().get(featureName);
+            if (supportedRange == null) {
+                supportedVersions.add(VersionRange.of(0, 0));
+            } else {
+                supportedVersions.add(VersionRange.of(supportedRange.min(), supportedRange.max()));
+            }
+        }
+        localSupportedFeature(featureName).ifPresent(supportedVersions::add);
+
+        if (supportedVersions.isEmpty()) {
+            return Optional.empty();
+        } else {
+            OptionalInt highestMinVersion = supportedVersions.stream().mapToInt(VersionRange::min).max();
+            OptionalInt lowestMaxVersion = supportedVersions.stream().mapToInt(VersionRange::max).min();
+            if (highestMinVersion.isPresent() && lowestMaxVersion.isPresent()) {
+                if (highestMinVersion.getAsInt() <= lowestMaxVersion.getAsInt()) {
+                    if (supportedVersions.size() < quorumNodeIds.size()) {
+                        log.info("Using incomplete set of quorum supported features.");
+                    }
+                    return Optional.of(VersionRange.of((short) highestMinVersion.getAsInt(), (short) lowestMaxVersion.getAsInt()));
+                } else {
+                    return Optional.empty();
+                }
+            } else {
+                return Optional.empty();
+            }
+        }
     }
 
     Optional<VersionRange> localSupportedFeature(String featureName) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
index c637918618..026dabf2f7 100644
--- a/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metadata.AccessControlEntryRecord;
 import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
 import org.apache.kafka.metadata.authorizer.StandardAcl;
 import org.apache.kafka.metadata.authorizer.StandardAclWithId;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -51,6 +52,10 @@ public final class AclsDelta {
         this.isSnapshotDelta = true;
     }
 
+    public void handleMetadataVersionChange(MetadataVersion newVersion) {
+        // no-op
+    }
+
     public boolean isSnapshotDelta() {
         return isSnapshotDelta;
     }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
index 4b574b3ada..1255f75e03 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
@@ -19,6 +19,7 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -47,6 +48,10 @@ public final class ClientQuotasDelta {
         }
     }
 
+    public void handleMetadataVersionChange(MetadataVersion newVersion) {
+        // no-op
+    }
+
     public void replay(ClientQuotaRecord record) {
         ClientQuotaEntity entity = ClientQuotaImage.dataToEntity(record.entity());
         ClientQuotaDelta change = changes.computeIfAbsent(entity, __ ->
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index 6c48b8ecde..1c4d66b9e9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -61,6 +62,10 @@ public final class ClusterDelta {
         }
     }
 
+    public void handleMetadataVersionChange(MetadataVersion newVersion) {
+        // no-op
+    }
+
     public void replay(RegisterBrokerRecord record) {
         BrokerRegistration broker = BrokerRegistration.fromRecord(record);
         changedBrokers.put(broker.id(), Optional.of(broker));
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
index d0f5848770..2a4bf1a1ca 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource.Type;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -52,6 +53,10 @@ public final class ConfigurationsDelta {
         }
     }
 
+    public void handleMetadataVersionChange(MetadataVersion newVersion) {
+        // no-op
+    }
+
     public void replay(ConfigRecord record) {
         ConfigResource resource =
             new ConfigResource(Type.forId(record.resourceType()), record.resourceName());
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index ca472322d6..28eb187bfb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -19,6 +19,7 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -34,6 +35,8 @@ public final class FeaturesDelta {
 
     private final Map<String, Optional<Short>> changes = new HashMap<>();
 
+    private MetadataVersion metadataVersionChange = null;
+
     public FeaturesDelta(FeaturesImage image) {
         this.image = image;
     }
@@ -42,6 +45,10 @@ public final class FeaturesDelta {
         return changes;
     }
 
+    public Optional<MetadataVersion> metadataVersionChange() {
+        return Optional.ofNullable(metadataVersionChange);
+    }
+
     public void finishSnapshot() {
         for (String featureName : image.finalizedVersions().keySet()) {
             if (!changes.containsKey(featureName)) {
@@ -51,11 +58,19 @@ public final class FeaturesDelta {
     }
 
     public void replay(FeatureLevelRecord record) {
-        changes.put(record.name(), Optional.of(record.featureLevel()));
+        if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+            metadataVersionChange = MetadataVersion.fromFeatureLevel(record.featureLevel());
+        } else {
+            changes.put(record.name(), Optional.of(record.featureLevel()));
+        }
     }
 
     public void replay(RemoveFeatureLevelRecord record) {
-        changes.put(record.name(), Optional.empty());
+        if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+            metadataVersionChange = null;
+        } else {
+            changes.put(record.name(), Optional.empty());
+        }
     }
 
     public FeaturesImage apply() {
@@ -80,13 +95,20 @@ public final class FeaturesDelta {
             }
         }
 
-        return new FeaturesImage(newFinalizedVersions);
+        final MetadataVersion metadataVersion;
+        if (metadataVersionChange == null) {
+            metadataVersion = image.metadataVersion();
+        } else {
+            metadataVersion = metadataVersionChange;
+        }
+        return new FeaturesImage(newFinalizedVersions, metadataVersion);
     }
 
     @Override
     public String toString() {
         return "FeaturesDelta(" +
             "changes=" + changes +
+            ", metadataVersionChange=" + metadataVersionChange +
             ')';
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
index 7e0f7fb435..be91b07090 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
@@ -19,6 +19,7 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -37,18 +38,25 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
  * This class is thread-safe.
  */
 public final class FeaturesImage {
-    public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap());
+    public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.UNINITIALIZED);
 
     private final Map<String, Short> finalizedVersions;
 
-    public FeaturesImage(Map<String, Short> finalizedVersions) {
+    private final MetadataVersion metadataVersion;
+
+    public FeaturesImage(Map<String, Short> finalizedVersions, MetadataVersion metadataVersion) {
         this.finalizedVersions = Collections.unmodifiableMap(finalizedVersions);
+        this.metadataVersion = metadataVersion;
     }
 
     public boolean isEmpty() {
         return finalizedVersions.isEmpty();
     }
 
+    public MetadataVersion metadataVersion() {
+        return metadataVersion;
+    }
+
     Map<String, Short> finalizedVersions() {
         return finalizedVersions;
     }
@@ -59,7 +67,16 @@ public final class FeaturesImage {
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
         List<ApiMessageAndVersion> batch = new ArrayList<>();
+        // Write out the metadata.version record first, and then the rest of the finalized features
+        if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+            batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+                setName(MetadataVersion.FEATURE_NAME).
+                setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+        }
         for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
+            if (entry.getKey().equals(MetadataVersion.FEATURE_NAME)) {
+                continue;
+            }
             batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(entry.getKey()).
                 setFeatureLevel(entry.getValue()), FEATURE_LEVEL_RECORD.highestSupportedVersion()));
@@ -84,6 +101,7 @@ public final class FeaturesImage {
     public String toString() {
         return "FeaturesImage{" +
                 "finalizedVersions=" + finalizedVersions +
+                ", metadataVersion=" + metadataVersion +
                 '}';
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 0ba285f8da..f40ae9ae7f 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -37,9 +37,11 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 
 
 /**
@@ -143,6 +145,14 @@ public final class MetadataDelta {
         return aclsDelta;
     }
 
+    public Optional<MetadataVersion> metadataVersionChanged() {
+        if (featuresDelta == null) {
+            return Optional.empty();
+        } else {
+            return featuresDelta.metadataVersionChange();
+        }
+    }
+
     public void read(long highestOffset, int highestEpoch, Iterator<List<ApiMessageAndVersion>> reader) {
         while (reader.hasNext()) {
             List<ApiMessageAndVersion> batch = reader.next();
@@ -253,6 +263,15 @@ public final class MetadataDelta {
 
     public void replay(FeatureLevelRecord record) {
         getOrCreateFeaturesDelta().replay(record);
+        featuresDelta.metadataVersionChange().ifPresent(changedMetadataVersion -> {
+            // If any feature flags change, need to immediately check if any metadata needs to be downgraded.
+            getOrCreateClusterDelta().handleMetadataVersionChange(changedMetadataVersion);
+            getOrCreateConfigsDelta().handleMetadataVersionChange(changedMetadataVersion);
+            getOrCreateTopicsDelta().handleMetadataVersionChange(changedMetadataVersion);
+            getOrCreateClientQuotasDelta().handleMetadataVersionChange(changedMetadataVersion);
+            getOrCreateProducerIdsDelta().handleMetadataVersionChange(changedMetadataVersion);
+            getOrCreateAclsDelta().handleMetadataVersionChange(changedMetadataVersion);
+        });
     }
 
     public void replay(BrokerRegistrationChangeRecord record) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index c887572ea8..48bed5f8a9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -120,6 +120,8 @@ public final class MetadataImage {
     }
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
+        // Features should be written out first so we can include the metadata.version at the beginning of the
+        // snapshot
         features.write(out);
         cluster.write(out);
         topics.write(out);
diff --git a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
index 99dd207863..6205279928 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
+import org.apache.kafka.server.common.MetadataVersion;
 
 
 public final class ProducerIdsDelta {
@@ -39,6 +40,10 @@ public final class ProducerIdsDelta {
         // Nothing to do
     }
 
+    public void handleMetadataVersionChange(MetadataVersion newVersion) {
+        // no-op
+    }
+
     public void replay(ProducerIdsRecord record) {
         nextProducerId = record.nextProducerId();
     }
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index f9d8087879..66f12102a5 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -117,6 +118,10 @@ public final class TopicsDelta {
         }
     }
 
+    public void handleMetadataVersionChange(MetadataVersion newVersion) {
+        // no-op
+    }
+
     public TopicsImage apply() {
         Map<Uuid, TopicImage> newTopicsById = new HashMap<>(image.topicsById().size());
         Map<String, TopicImage> newTopicsByName = new HashMap<>(image.topicsByName().size());
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java b/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
index 178d338344..ed0648bef5 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
@@ -37,6 +37,10 @@ public class VersionRange {
         return new VersionRange(min, max);
     }
 
+    public static VersionRange of(int min, int max) {
+        return new VersionRange((short) min, (short) max);
+    }
+
     public short min() {
         return min;
     }
diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
similarity index 99%
rename from shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
rename to metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index 9edf8685f1..1e5e6371d0 100644
--- a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.metadata.util;
 
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileWriter.java
new file mode 100644
index 0000000000..5608bdc464
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileWriter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.util;
+
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.MetadataRecordSerde;
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.BatchMemoryPool;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
+
+
+/**
+ * Write an arbitrary set of metadata records into a Kafka metadata snapshot format. The resulting snapshot will be use
+ * epoch of zero and an initial offset of zero. This class should not be used for creating actual metadata snapshots.
+ */
+public class SnapshotFileWriter implements AutoCloseable {
+    private final FileChannel channel;
+    private final BatchAccumulator<ApiMessageAndVersion> batchAccumulator;
+
+    SnapshotFileWriter(FileChannel channel, BatchAccumulator<ApiMessageAndVersion> batchAccumulator) {
+        this.channel = channel;
+        this.batchAccumulator = batchAccumulator;
+    }
+
+    public void append(ApiMessageAndVersion apiMessageAndVersion) {
+        batchAccumulator.append(0, Collections.singletonList(apiMessageAndVersion));
+    }
+
+    public void append(List<ApiMessageAndVersion> messageBatch) {
+        batchAccumulator.append(0, messageBatch);
+    }
+
+    public void close() throws IOException {
+        for (BatchAccumulator.CompletedBatch<ApiMessageAndVersion> batch : batchAccumulator.drain()) {
+            Utils.writeFully(channel, batch.data.buffer());
+        }
+        channel.close();
+    }
+
+    public static SnapshotFileWriter open(Path snapshotPath) throws IOException {
+        BatchAccumulator<ApiMessageAndVersion> batchAccumulator = new BatchAccumulator<>(
+            0,
+            0,
+            Integer.MAX_VALUE,
+            MAX_BATCH_SIZE_BYTES,
+            new BatchMemoryPool(5, MAX_BATCH_SIZE_BYTES),
+            Time.SYSTEM,
+            CompressionType.NONE,
+            new MetadataRecordSerde());
+
+        FileChannel channel = FileChannel.open(snapshotPath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+
+        return new SnapshotFileWriter(channel, batchAccumulator);
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java b/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
new file mode 100644
index 0000000000..16e03a0501
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BootstrapMetadataTest {
+    @Test
+    public void testWriteAndReadBootstrapFile() throws Exception {
+        Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
+        BootstrapMetadata metadata = BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0);
+        BootstrapMetadata.write(metadata, tmpDir);
+
+        assertTrue(Files.exists(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE)));
+
+        BootstrapMetadata newMetadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
+        assertEquals(metadata, newMetadata);
+    }
+
+    @Test
+    public void testNoBootstrapFile() throws Exception {
+        Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
+        BootstrapMetadata metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
+        assertEquals(MetadataVersion.IBP_3_0_IV0, metadata.metadataVersion());
+        metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_2_IV0);
+        assertEquals(MetadataVersion.IBP_3_2_IV0, metadata.metadataVersion());
+    }
+
+    @Test
+    public void testExistingBootstrapFile() throws Exception {
+        Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
+        BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0), tmpDir);
+        assertThrows(IOException.class, () -> {
+            BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_1_IV0), tmpDir);
+        });
+    }
+
+    @Test
+    public void testEmptyBootstrapFile() throws Exception {
+        Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
+        Files.createFile(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE));
+        assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
+            "Should fail to load if no metadata.version is set");
+    }
+
+    @Test
+    public void testGarbageBootstrapFile() throws Exception {
+        Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
+        Files.createFile(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE));
+        Random random = new Random(1);
+        byte[] data = new byte[100];
+        random.nextBytes(data);
+        Files.write(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE), data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
+            "Should fail on invalid data");
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index f53b493a97..ec1fcdeb0a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.protocol.Errors;
@@ -33,6 +34,7 @@ import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -42,13 +44,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(value = 40)
 public class FeatureControlManagerTest {
+
     @SuppressWarnings("unchecked")
     private static Map<String, VersionRange> rangeMap(Object... args) {
         Map<String, VersionRange> result = new HashMap<>();
         for (int i = 0; i < args.length; i += 3) {
             String feature = (String) args[i];
-            Integer low = (Integer) args[i + 1];
-            Integer high = (Integer) args[i + 2];
+            Number low = (Number) args[i + 1];
+            Number high = (Number) args[i + 2];
             result.put(feature, VersionRange.of(low.shortValue(), high.shortValue()));
         }
         return result;
@@ -58,21 +61,23 @@ public class FeatureControlManagerTest {
         Map<String, Short> result = new HashMap<>();
         for (int i = 0; i < args.length; i += 2) {
             String feature = (String) args[i];
-            Integer ver = (Integer) args[i + 1];
+            Number ver = (Number) args[i + 1];
             result.put(feature, ver.shortValue());
         }
         return result;
     }
 
     public static QuorumFeatures features(Object... args) {
-        return QuorumFeatures.create(0, rangeMap(args));
+        Map<String, VersionRange> features = QuorumFeatures.defaultFeatureMap();
+        features.putAll(rangeMap(args));
+        return new QuorumFeatures(0, new ApiVersions(), features, Collections.emptyList());
     }
 
     private static Map<String, Short> updateMap(Object... args) {
         Map<String, Short> result = new HashMap<>();
         for (int i = 0; i < args.length; i += 2) {
             String feature = (String) args[i];
-            Integer ver = (Integer) args[i + 1];
+            Number ver = (Number) args[i + 1];
             result.put(feature, ver.shortValue());
         }
         return result;
@@ -89,7 +94,7 @@ public class FeatureControlManagerTest {
             manager.finalizedFeatures(-1));
         assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given feature range."))),
+                    "Invalid update version 3 for feature foo. The quorum does not support the given feature version."))),
             manager.updateFeatures(updateMap("foo", 3),
                 Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
                 Collections.emptyMap(), false));
@@ -99,7 +104,7 @@ public class FeatureControlManagerTest {
         Map<String, ApiError> expectedMap = new HashMap<>();
         expectedMap.put("foo", ApiError.NONE);
         expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The controller does not support the given feature."));
+                "Invalid update version 1 for feature bar. The controller does not support the given feature."));
         assertEquals(expectedMap, result.response());
         List<ApiMessageAndVersion> expectedMessages = new ArrayList<>();
         expectedMessages.add(new ApiMessageAndVersion(new FeatureLevelRecord().
@@ -138,7 +143,7 @@ public class FeatureControlManagerTest {
                     "foo",
                     new ApiError(
                         Errors.INVALID_UPDATE_VERSION,
-                        "Broker 5 does not support the given feature range."
+                        "Invalid update version 3 for feature foo. Broker 5 does not support this feature."
                     )
                 )
             ),
@@ -157,8 +162,8 @@ public class FeatureControlManagerTest {
 
         assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature without setting the upgrade type to " +
-                    "safe or unsafe downgrade."))),
+                    "Invalid update version 2 for feature foo. Can't downgrade the version of this feature " +
+                    "without setting the upgrade type to either safe or unsafe downgrade."))),
             manager.updateFeatures(updateMap("foo", 2),
                 Collections.emptyMap(), Collections.emptyMap(), false));
 
@@ -201,4 +206,81 @@ public class FeatureControlManagerTest {
                 setFeatureLevel((short) 1), (short) 0))),
             manager.iterator(Long.MAX_VALUE));
     }
+
+    @Test
+    public void testInitializeMetadataVersion() {
+        // Default QuorumFeatures
+        checkMetadataVersion(features(), MetadataVersion.IBP_3_0_IV0, Errors.NONE);
+        checkMetadataVersion(features(), MetadataVersion.latest(), Errors.NONE);
+        checkMetadataVersion(features(), MetadataVersion.UNINITIALIZED, Errors.INVALID_UPDATE_VERSION);
+        checkMetadataVersion(features(), MetadataVersion.IBP_2_7_IV1, Errors.INVALID_UPDATE_VERSION);
+
+        // Increased QuorumFeatures
+        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
+        checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0, Errors.INVALID_UPDATE_VERSION);
+
+        // Empty QuorumFeatures
+        features = new QuorumFeatures(0, new ApiVersions(), Collections.emptyMap(), Collections.emptyList());
+        checkMetadataVersion(features, MetadataVersion.latest(), Errors.INVALID_UPDATE_VERSION);
+        checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0, Errors.INVALID_UPDATE_VERSION);
+    }
+
+    @Test
+    public void reInitializeMetadataVersion() {
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        FeatureControlManager manager = new FeatureControlManager(logContext, features(), snapshotRegistry);
+        ControllerResult<Map<String, ApiError>> result = manager.initializeMetadataVersion(MetadataVersion.IBP_3_0_IV0.featureLevel());
+        Errors actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
+        assertEquals(Errors.NONE, actual);
+        RecordTestUtils.replayAll(manager, result.records());
+
+        result = manager.initializeMetadataVersion(MetadataVersion.latest().featureLevel());
+        actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
+        assertEquals(Errors.INVALID_UPDATE_VERSION, actual);
+    }
+
+    public void checkMetadataVersion(QuorumFeatures features, MetadataVersion version, Errors expected) {
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        FeatureControlManager manager = new FeatureControlManager(logContext, features, snapshotRegistry);
+        ControllerResult<Map<String, ApiError>> result = manager.initializeMetadataVersion(version.featureLevel());
+        Errors actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testDowngradeMetadataVersion() {
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
+        FeatureControlManager manager = new FeatureControlManager(logContext, features, snapshotRegistry);
+        ControllerResult<Map<String, ApiError>> result = manager.initializeMetadataVersion(MetadataVersion.IBP_3_3_IV0.featureLevel());
+        RecordTestUtils.replayAll(manager, result.records());
+        assertEquals(manager.metadataVersion(), MetadataVersion.latest());
+
+        result = manager.updateFeatures(
+            Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
+            Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
+            Collections.emptyMap(),
+            true);
+        assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
+
+
+        result = manager.updateFeatures(
+            Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
+            Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+            Collections.emptyMap(),
+            true);
+        assertEquals(Errors.NONE, result.response().get(MetadataVersion.FEATURE_NAME).error());
+
+        result = manager.updateFeatures(
+                Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV0.featureLevel()),
+                Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+                Collections.emptyMap(),
+                true);
+        assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
+        assertEquals("Invalid update version 1 for feature metadata.version. The quorum does not support the given feature version.",
+            result.response().get(MetadataVersion.FEATURE_NAME).message());
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 148c5720b6..e51b38e783 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -67,6 +67,7 @@ import org.apache.kafka.common.message.ElectLeadersRequestData;
 import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -86,6 +87,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.RawSnapshotReader;
 import org.apache.kafka.snapshot.RecordsSnapshotReader;
@@ -210,7 +212,7 @@ public class QuorumControllerTest {
             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
                 b.setConfigSchema(SCHEMA);
-            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty());
+            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty(), MetadataVersion.latest());
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -302,7 +304,7 @@ public class QuorumControllerTest {
             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
                 b.setConfigSchema(SCHEMA);
-            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs));
+            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs), MetadataVersion.latest());
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -439,7 +441,7 @@ public class QuorumControllerTest {
                         setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
                         setFeatures(brokerFeatures()).
                         setListeners(listeners));
-                assertEquals(0L, reply.get().epoch());
+                assertEquals(2L, reply.get().epoch());
                 CreateTopicsRequestData createTopicsRequestData =
                     new CreateTopicsRequestData().setTopics(
                         new CreatableTopicCollection(Collections.singleton(
@@ -455,7 +457,7 @@ public class QuorumControllerTest {
                             get().topics().find("foo").errorMessage());
                 assertEquals(new BrokerHeartbeatReply(true, false, false, false),
                     active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                            setWantFence(false).setBrokerEpoch(0L).setBrokerId(0).
+                            setWantFence(false).setBrokerEpoch(2L).setBrokerId(0).
                             setCurrentMetadataOffset(100000L)).get());
                 assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
                     createTopicsRequestData, Collections.singleton("foo")).
@@ -483,6 +485,10 @@ public class QuorumControllerTest {
 
     private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
         BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
+        features.add(new BrokerRegistrationRequestData.Feature()
+            .setName(MetadataVersion.FEATURE_NAME)
+            .setMinSupportedVersion(MetadataVersion.IBP_3_0_IV0.featureLevel())
+            .setMaxSupportedVersion(MetadataVersion.latest().featureLevel()));
         return features;
     }
 
@@ -680,6 +686,9 @@ public class QuorumControllerTest {
 
     private List<ApiMessageAndVersion> expectedSnapshotContent(Uuid fooId, Map<Integer, Long> brokerEpochs) {
         return Arrays.asList(
+            new ApiMessageAndVersion(new FeatureLevelRecord().
+                setName(MetadataVersion.FEATURE_NAME).
+                setFeatureLevel(MetadataVersion.latest().featureLevel()), (short) 0),
             new ApiMessageAndVersion(new TopicRecord().
                 setName("foo").setTopicId(fooId), (short) 0),
             new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index b7bff3883b..af9cd218b4 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -21,9 +21,12 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.controller.QuorumController.Builder;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +36,8 @@ import java.util.List;
 import java.util.OptionalInt;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class QuorumControllerTestEnv implements AutoCloseable {
     private static final Logger log =
@@ -45,23 +50,27 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         LocalLogManagerTestEnv logEnv,
         Consumer<QuorumController.Builder> builderConsumer
     ) throws Exception {
-        this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty());
+        this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), MetadataVersion.latest());
     }
 
     public QuorumControllerTestEnv(
         LocalLogManagerTestEnv logEnv,
         Consumer<Builder> builderConsumer,
         OptionalLong sessionTimeoutMillis,
-        OptionalLong leaderImbalanceCheckIntervalNs
+        OptionalLong leaderImbalanceCheckIntervalNs,
+        MetadataVersion metadataVersion
     ) throws Exception {
         this.logEnv = logEnv;
         int numControllers = logEnv.logManagers().size();
         this.controllers = new ArrayList<>(numControllers);
         try {
+            ApiVersions apiVersions = new ApiVersions();
+            List<Integer> nodeIds = IntStream.range(0, numControllers).boxed().collect(Collectors.toList());
             for (int i = 0; i < numControllers; i++) {
                 QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
                 builder.setRaftClient(logEnv.logManagers().get(i));
-                builder.setQuorumFeatures(new QuorumFeatures(i, QuorumFeatures.defaultFeatureMap()));
+                builder.setBootstrapMetadata(BootstrapMetadata.create(metadataVersion));
+                builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
                 sessionTimeoutMillis.ifPresent(timeout -> {
                     builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
                 });
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
new file mode 100644
index 0000000000..0194cd674e
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.metadata.VersionRange;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QuorumFeaturesTest {
+    @Test
+    public void testQuorumFeatures() {
+        ApiVersions apiVersions = new ApiVersions();
+        Map<String, VersionRange> featureMap = new HashMap<>(2);
+        featureMap.put("foo", VersionRange.of(1, 2));
+        featureMap.put("bar", VersionRange.of(3, 5));
+
+        List<Integer> nodeIds = new ArrayList<>();
+        nodeIds.add(0);
+
+        QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, featureMap, nodeIds);
+        assertLocalFeature(quorumFeatures, "foo", 1, 2);
+        assertLocalFeature(quorumFeatures, "bar", 3, 5);
+        assertQuorumFeature(quorumFeatures, "foo", 1, 2);
+        assertQuorumFeature(quorumFeatures, "bar", 3, 5);
+
+        // Add a second node with identical features
+        nodeIds.add(1);
+        apiVersions.update("1", nodeApiVersions(featureMap));
+        assertLocalFeature(quorumFeatures, "foo", 1, 2);
+        assertLocalFeature(quorumFeatures, "bar", 3, 5);
+        assertQuorumFeature(quorumFeatures, "foo", 1, 2);
+        assertQuorumFeature(quorumFeatures, "bar", 3, 5);
+
+        // Change the supported features of one node
+        Map<String, VersionRange> node1Features = new HashMap<>(featureMap);
+        node1Features.put("bar", VersionRange.of(3, 4));
+        apiVersions.update("1", nodeApiVersions(node1Features));
+        assertLocalFeature(quorumFeatures, "foo", 1, 2);
+        assertLocalFeature(quorumFeatures, "bar", 3, 5);
+        assertQuorumFeature(quorumFeatures, "foo", 1, 2);
+        assertQuorumFeature(quorumFeatures, "bar", 3, 4);
+
+        // Add a third node with no features
+        nodeIds.add(2);
+        apiVersions.update("1", NodeApiVersions.create());
+        assertFalse(quorumFeatures.quorumSupportedFeature("foo").isPresent());
+        assertFalse(quorumFeatures.quorumSupportedFeature("bar").isPresent());
+    }
+
+
+    public static NodeApiVersions nodeApiVersions(Map<String, VersionRange> featureMap) {
+        List<ApiVersionsResponseData.SupportedFeatureKey> supportedFeatures = new ArrayList<>(featureMap.size());
+        featureMap.forEach((featureName, versionRange) -> {
+            supportedFeatures.add(new ApiVersionsResponseData.SupportedFeatureKey()
+                .setName(featureName)
+                .setMinVersion(versionRange.min())
+                .setMaxVersion(versionRange.max()));
+        });
+        return new NodeApiVersions(Collections.emptyList(), supportedFeatures);
+    }
+
+    private void assertLocalFeature(QuorumFeatures features, String name, int expectedMin, int expectedMax) {
+        Optional<VersionRange> featureRange = features.localSupportedFeature(name);
+        assertTrue(featureRange.isPresent());
+        assertEquals(expectedMin, featureRange.get().min());
+        assertEquals(expectedMax, featureRange.get().max());
+    }
+
+    private void assertQuorumFeature(QuorumFeatures features, String name, int expectedMin, int expectedMax) {
+        Optional<VersionRange> featureRange = features.quorumSupportedFeature(name);
+        assertTrue(featureRange.isPresent());
+        assertEquals(expectedMin, featureRange.get().min());
+        assertEquals(expectedMax, featureRange.get().max());
+    }
+
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
index 52388fbf33..0812048bb0 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -46,7 +47,7 @@ public class FeaturesImageTest {
         map1.put("foo", (short) 2);
         map1.put("bar", (short) 1);
         map1.put("baz", (short) 8);
-        IMAGE1 = new FeaturesImage(map1);
+        IMAGE1 = new FeaturesImage(map1, MetadataVersion.latest());
 
         DELTA1_RECORDS = new ArrayList<>();
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
@@ -62,7 +63,7 @@ public class FeaturesImageTest {
 
         Map<String, Short> map2 = new HashMap<>();
         map2.put("foo", (short) 3);
-        IMAGE2 = new FeaturesImage(map2);
+        IMAGE2 = new FeaturesImage(map2, MetadataVersion.latest());
     }
 
     @Test
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java b/server-common/src/main/java/org/apache/kafka/metadata/FeatureLevelListener.java
similarity index 64%
copy from server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java
copy to server-common/src/main/java/org/apache/kafka/metadata/FeatureLevelListener.java
index c4255946ba..de91e268a2 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java
+++ b/server-common/src/main/java/org/apache/kafka/metadata/FeatureLevelListener.java
@@ -15,19 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.server.common;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.junit.jupiter.api.Test;
-
-public class MetadataVersionValidatorTest {
-
-    @Test
-    public void testMetadataVersionValidator() {
-        String str = new MetadataVersionValidator().toString();
-        String[] apiVersions = str.substring(1).split(",");
-        assertEquals(MetadataVersion.VALUES.length, apiVersions.length);
-    }
+package org.apache.kafka.metadata;
 
+/**
+ * A callback for changes to feature levels. Currently, this is only used by the controller to receive a callback
+ * when committed FeatureLevelRecords are being replayed.
+ */
+public interface FeatureLevelListener {
+    void handle(String featureName, short finalizedVersion);
 }
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index e95b9248ec..455e051541 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -43,6 +43,8 @@ import org.apache.kafka.common.record.RecordVersion;
  * released version, they can use "0.10.0" when upgrading to the 0.10.0 release.
  */
 public enum MetadataVersion {
+    UNINITIALIZED(-1, "0.0", ""),
+
     IBP_0_8_0(-1, "0.8.0", ""),
     IBP_0_8_1(-1, "0.8.1", ""),
     IBP_0_8_2(-1, "0.8.2", ""),
@@ -138,38 +140,46 @@ public enum MetadataVersion {
     IBP_2_8_IV1(-1, "2.8", "IV1"),
 
     // Introduce AllocateProducerIds (KIP-730)
-    IBP_3_0_IV0(1, "3.0", "IV0"),
+    IBP_3_0_IV0(1, "3.0", "IV0", true),
 
     // Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)
     // Assume message format version is 3.0 (KIP-724)
-    IBP_3_0_IV1(2, "3.0", "IV1"),
+    IBP_3_0_IV1(2, "3.0", "IV1", false),
 
     // Adds topic IDs to Fetch requests/responses (KIP-516)
-    IBP_3_1_IV0(3, "3.1", "IV0"),
+    IBP_3_1_IV0(3, "3.1", "IV0", false),
 
     // Support for leader recovery for unclean leader election (KIP-704)
-    IBP_3_2_IV0(4, "3.2", "IV0");
+    IBP_3_2_IV0(4, "3.2", "IV0", false),
+
+    // Support for metadata.version feature flag (KIP-778)
+    IBP_3_3_IV0(5, "3.3", "IV0", false);
+
+    public static final String FEATURE_NAME = "metadata.version";
+
+    public static final MetadataVersion[] VERSIONS;
 
-    public static final MetadataVersion[] VALUES = MetadataVersion.values();
-    private final Optional<Short> featureLevel;
+    private final short featureLevel;
     private final String release;
     private final String ibpVersion;
+    private final boolean didMetadataChange;
 
     MetadataVersion(int featureLevel, String release, String subVersion) {
-        if (featureLevel > 0) {
-            this.featureLevel = Optional.of((short) featureLevel);
-        } else {
-            this.featureLevel = Optional.empty();
-        }
+        this(featureLevel, release, subVersion, true);
+    }
+
+    MetadataVersion(int featureLevel, String release, String subVersion, boolean didMetadataChange) {
+        this.featureLevel = (short) featureLevel;
         this.release = release;
         if (subVersion.isEmpty()) {
             this.ibpVersion = release;
         } else {
             this.ibpVersion = String.format("%s-%s", release, subVersion);
         }
+        this.didMetadataChange = didMetadataChange;
     }
 
-    public Optional<Short> featureLevel() {
+    public short featureLevel() {
         return featureLevel;
     }
 
@@ -201,6 +211,9 @@ public enum MetadataVersion {
         return this.isAtLeast(IBP_3_0_IV0);
     }
 
+    public boolean isKRaftSupported() {
+        return this.featureLevel > 0;
+    }
 
     public RecordVersion highestSupportedRecordVersion() {
         if (this.isLessThan(IBP_0_10_0_IV0)) {
@@ -215,9 +228,13 @@ public enum MetadataVersion {
     private static final Map<String, MetadataVersion> IBP_VERSIONS;
     static {
         {
+            // Make a copy of values() and omit UNINITIALIZED
+            MetadataVersion[] enumValues = MetadataVersion.values();
+            VERSIONS = Arrays.copyOfRange(enumValues, 1, enumValues.length);
+
             IBP_VERSIONS = new HashMap<>();
             Map<String, MetadataVersion> maxInterVersion = new HashMap<>();
-            for (MetadataVersion metadataVersion : VALUES) {
+            for (MetadataVersion metadataVersion : VERSIONS) {
                 maxInterVersion.put(metadataVersion.release, metadataVersion);
                 IBP_VERSIONS.put(metadataVersion.ibpVersion, metadataVersion);
             }
@@ -233,6 +250,19 @@ public enum MetadataVersion {
         return ibpVersion;
     }
 
+    public boolean didMetadataChange() {
+        return didMetadataChange;
+    }
+
+    Optional<MetadataVersion> previous() {
+        int idx = this.ordinal();
+        if (idx > 1) {
+            return Optional.of(VERSIONS[idx - 2]);
+        } else {
+            return Optional.empty();
+        }
+    }
+
     /**
      * Return an `MetadataVersion` instance for `versionString`, which can be in a variety of formats (e.g. "0.8.0", "0.8.0.x",
      * "0.10.0", "0.10.0-IV1"). `IllegalArgumentException` is thrown if `versionString` cannot be mapped to an `MetadataVersion`.
@@ -253,6 +283,15 @@ public enum MetadataVersion {
         );
     }
 
+    public static MetadataVersion fromFeatureLevel(short version) {
+        for (MetadataVersion metadataVersion: MetadataVersion.values()) {
+            if (metadataVersion.featureLevel() == version) {
+                return metadataVersion;
+            }
+        }
+        throw new IllegalArgumentException("No MetadataVersion with metadata version " + version);
+    }
+
     /**
      * Return the minimum `MetadataVersion` that supports `RecordVersion`.
      */
@@ -270,7 +309,36 @@ public enum MetadataVersion {
     }
 
     public static MetadataVersion latest() {
-        return VALUES[VALUES.length - 1];
+        return VERSIONS[VERSIONS.length - 1];
+    }
+
+    public static boolean checkIfMetadataChanged(MetadataVersion sourceVersion, MetadataVersion targetVersion) {
+        if (sourceVersion == targetVersion) {
+            return false;
+        }
+
+        final MetadataVersion highVersion, lowVersion;
+        if (sourceVersion.compareTo(targetVersion) < 0) {
+            highVersion = targetVersion;
+            lowVersion = sourceVersion;
+        } else {
+            highVersion = sourceVersion;
+            lowVersion = targetVersion;
+        }
+        return checkIfMetadataChangedOrdered(highVersion, lowVersion);
+    }
+
+    private static boolean checkIfMetadataChangedOrdered(MetadataVersion highVersion, MetadataVersion lowVersion) {
+        MetadataVersion version = highVersion;
+        while (!version.didMetadataChange() && version != lowVersion) {
+            Optional<MetadataVersion> prev = version.previous();
+            if (prev.isPresent()) {
+                version = prev.get();
+            } else {
+                break;
+            }
+        }
+        return version != lowVersion;
     }
 
     public boolean isAtLeast(MetadataVersion otherVersion) {
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java
index d685dd0187..072f956eb8 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java
@@ -34,7 +34,7 @@ public class MetadataVersionValidator implements Validator {
 
     @Override
     public String toString() {
-        return "[" + Arrays.stream(MetadataVersion.VALUES).map(MetadataVersion::version).collect(
+        return "[" + Arrays.stream(MetadataVersion.VERSIONS).map(MetadataVersion::version).collect(
              Collectors.joining(", ")) + "]";
     }
 }
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 2293572b28..8a825e3da2 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -63,23 +63,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class MetadataVersionTest {
 
     @Test
     public void testFeatureLevel() {
-        int firstFeatureLevelIndex = Arrays.asList(MetadataVersion.VALUES).indexOf(IBP_3_0_IV0);
+        MetadataVersion[] metadataVersions = MetadataVersion.VERSIONS;
+        int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(IBP_3_0_IV0);
         for (int i = 0; i < firstFeatureLevelIndex; i++) {
-            assertFalse(MetadataVersion.VALUES[i].featureLevel().isPresent());
+            assertTrue(metadataVersions[i].featureLevel() < 0);
         }
         short expectedFeatureLevel = 1;
-        for (int i = firstFeatureLevelIndex; i < MetadataVersion.VALUES.length; i++) {
-            MetadataVersion metadataVersion = MetadataVersion.VALUES[i];
-            short featureLevel = metadataVersion.featureLevel().orElseThrow(() ->
-                new IllegalArgumentException(
-                    String.format("Metadata version %s must have a non-null feature level", metadataVersion.version())));
-            assertEquals(expectedFeatureLevel, featureLevel,
-                String.format("Metadata version %s should have feature level %s", metadataVersion.version(), expectedFeatureLevel));
+        for (int i = firstFeatureLevelIndex; i < metadataVersions.length; i++) {
+            MetadataVersion metadataVersion = metadataVersions[i];
+            assertEquals(expectedFeatureLevel, metadataVersion.featureLevel(),
+                    String.format("Metadata version %s should have feature level %s", metadataVersion.featureLevel(), expectedFeatureLevel));
             expectedFeatureLevel += 1;
         }
     }
@@ -264,4 +263,44 @@ class MetadataVersionTest {
         assertEquals("3.2-IV0", IBP_3_2_IV0.version());
     }
 
+    @Test
+    public void testPrevious() {
+        for (int i = 1; i < MetadataVersion.VERSIONS.length - 2; i++) {
+            MetadataVersion version = MetadataVersion.VERSIONS[i];
+            assertTrue(version.previous().isPresent());
+            assertEquals(MetadataVersion.VERSIONS[i - 1], version.previous().get());
+        }
+    }
+
+    @Test
+    public void testMetadataChanged() {
+        assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_3_2_IV0));
+        assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_3_1_IV0));
+        assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_3_0_IV1));
+        assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_3_0_IV0));
+        assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, IBP_2_8_IV1));
+
+        // Check that argument order doesn't matter
+        assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_0_IV0, IBP_3_2_IV0));
+        assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_2_8_IV1, IBP_3_2_IV0));
+    }
+
+    @Test
+    public void testKRaftVersions() {
+        for (MetadataVersion metadataVersion : MetadataVersion.VERSIONS) {
+            if (metadataVersion.isKRaftSupported()) {
+                assertTrue(metadataVersion.featureLevel() > 0);
+            } else {
+                assertEquals(-1, metadataVersion.featureLevel());
+            }
+        }
+
+        for (MetadataVersion metadataVersion : MetadataVersion.VERSIONS) {
+            if (metadataVersion.isAtLeast(IBP_3_0_IV0)) {
+                assertTrue(metadataVersion.isKRaftSupported());
+            } else {
+                assertFalse(metadataVersion.isKRaftSupported());
+            }
+        }
+    }
 }
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java
index c4255946ba..707d0d11d3 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionValidatorTest.java
@@ -27,7 +27,7 @@ public class MetadataVersionValidatorTest {
     public void testMetadataVersionValidator() {
         String str = new MetadataVersionValidator().toString();
         String[] apiVersions = str.substring(1).split(",");
-        assertEquals(MetadataVersion.VALUES.length, apiVersions.length);
+        assertEquals(MetadataVersion.VERSIONS.length, apiVersions.length);
     }
 
 }
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
index 1d99623044..58acf28b70 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
@@ -24,6 +24,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.util.SnapshotFileReader;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;