You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/06/13 18:23:38 UTC

[kafka] branch trunk updated: KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cc384054c6 KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)
cc384054c6 is described below

commit cc384054c6e63abd011f7687523b1292062b049d
Author: David Arthur <mu...@gmail.com>
AuthorDate: Mon Jun 13 14:23:28 2022 -0400

    KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)
    
    * Set the minimum supported MetadataVersion to 3.0-IV1
    * Remove MetadataVersion.UNINITIALIZED
    * Relocate RPC version mapping for fetch protocols into MetadataVersion
    * Replace static IBP calls with dynamic calls to MetadataCache
    
    A side effect of removing the UNINITIALIZED metadata version is that the FeatureControlManager and FeatureImage will initialize themselves with the minimum KRaft version (3.0-IV1).
    
    The rationale for setting the minimum version to 3.0-IV1 is so that we can avoid any cases of KRaft mode running with an old log message format (KIP-724 was introduced in 3.0-IV1). As a side-effect of increasing this minimum version, the feature level values decreased by one.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>
---
 .../main/scala/kafka/server/BrokerFeatures.scala   |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 51 +++++-------
 .../main/scala/kafka/server/KafkaRaftServer.scala  | 16 ++--
 .../scala/kafka/server/RemoteLeaderEndPoint.scala  | 22 ++++--
 .../scala/kafka/server/ReplicaFetcherManager.scala |  9 ++-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |  8 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  6 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |  6 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  |  4 +-
 core/src/test/java/kafka/test/ClusterConfig.java   | 16 ++--
 .../java/kafka/test/ClusterTestExtensionsTest.java |  6 ++
 .../java/kafka/test/annotation/ClusterTest.java    |  2 +-
 .../kafka/test/junit/ClusterTestExtensions.java    |  8 +-
 .../test/junit/RaftClusterInvocationContext.java   |  3 +-
 .../test/junit/ZkClusterInvocationContext.java     |  2 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  2 +-
 .../transaction/ProducerIdsIntegrationTest.scala   |  2 +-
 .../server/MetadataVersionIntegrationTest.scala    |  9 ++-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 28 +++++++
 .../unit/kafka/server/KafkaRaftServerTest.scala    | 55 ++++++++++++-
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 24 +++---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 13 +++-
 .../metadata/BrokerMetadataListenerTest.scala      |  2 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  6 +-
 .../apache/kafka/controller/BootstrapMetadata.java | 27 ++++---
 .../kafka/controller/ClusterControlManager.java    |  6 +-
 .../kafka/controller/FeatureControlManager.java    | 24 ++++--
 .../apache/kafka/controller/QuorumController.java  | 17 ++--
 .../apache/kafka/controller/QuorumFeatures.java    |  4 +-
 .../java/org/apache/kafka/image/FeaturesImage.java | 11 ++-
 .../java/org/apache/kafka/image/MetadataImage.java |  5 --
 .../kafka/controller/BootstrapMetadataTest.java    | 16 ++--
 .../controller/ClusterControlManagerTest.java      |  2 +-
 .../controller/FeatureControlManagerTest.java      | 19 +++--
 .../kafka/controller/QuorumControllerTest.java     | 24 +++++-
 .../kafka/controller/QuorumControllerTestEnv.java  |  6 +-
 .../kafka/server/common/MetadataVersion.java       | 90 ++++++++++++++++++----
 .../kafka/server/common/MetadataVersionTest.java   |  8 +-
 38 files changed, 379 insertions(+), 182 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index d385f1eb07..70ef7c71cb 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -73,7 +73,7 @@ object BrokerFeatures extends Logging {
   def createDefault(): BrokerFeatures = {
     new BrokerFeatures(Features.supportedFeatures(
       java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
-        new SupportedVersionRange(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()))))
+        new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel()))))
   }
 
   def createEmpty(): BrokerFeatures = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3d7df18cbb..b4e0b9449c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1790,38 +1790,25 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   // We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0`
   // is passed, `0.10.0-IV0` may be picked)
   val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp)
-  val interBrokerProtocolVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
-
-  val fetchRequestVersion: Short =
-    if (interBrokerProtocolVersion.isAtLeast(IBP_3_1_IV0)) 13
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_7_IV1)) 12
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 11
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV2)) 10
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 8
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_1_1_IV0)) 7
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV1)) 5
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 4
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV1)) 3
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_0_IV0)) 2
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_0_9_0)) 1
-    else 0
-
-  val offsetForLeaderEpochRequestVersion: Short =
-    if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 4
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 3
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 2
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV0)) 1
-    else 0
-
-  val listOffsetRequestVersion: Short =
-    if (interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)) 7
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 6
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_2_IV1)) 5
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 4
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 3
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 2
-    else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2)) 1
-    else 0
+  val interBrokerProtocolVersion = if (processRoles.isEmpty) {
+    MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
+  } else {
+    if (originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
+      // A user-supplied IBP was given
+      val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
+      if (!configuredVersion.isKRaftSupported) {
+        throw new ConfigException(s"A non-KRaft version ${interBrokerProtocolVersionString} given for ${KafkaConfig.InterBrokerProtocolVersionProp}. " +
+          s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
+      } else {
+        warn(s"${KafkaConfig.InterBrokerProtocolVersionProp} is deprecated in KRaft mode as of 3.3 and will only " +
+          s"be read when first upgrading from a KRaft prior to 3.3. See kafka-storage.sh help for details on setting " +
+          s"the metadata version for a new KRaft cluster.")
+      }
+    }
+    // In KRaft mode, we pin this value to the minimum KRaft-supported version. This prevents inadvertent usage of
+    // the static IBP config in broker components running in KRaft mode
+    MetadataVersion.MINIMUM_KRAFT_VERSION
+  }
 
   /** ********* Controlled shutdown configuration ***********/
   val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 5a1c3087d3..f1474430b9 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.nio.file.Paths
 import scala.collection.Seq
+import scala.compat.java8.FunctionConverters.asJavaSupplier
 import scala.jdk.CollectionConverters._
 
 /**
@@ -180,13 +181,16 @@ object KafkaRaftServer {
           "If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
     }
 
-    // Load the bootstrap metadata file or, in the case of an upgrade from KRaft preview, bootstrap the
-    // metadata.version corresponding to a user-configured IBP.
-    val bootstrapMetadata = if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
-      BootstrapMetadata.load(Paths.get(config.metadataLogDir), config.interBrokerProtocolVersion)
-    } else {
-      BootstrapMetadata.load(Paths.get(config.metadataLogDir), MetadataVersion.IBP_3_0_IV0)
+    // Load the bootstrap metadata file. In the case of an upgrade from older KRaft where there is no bootstrap metadata,
+    // read the IBP from config in order to bootstrap the equivalent metadata version.
+    def getUserDefinedIBPVersionOrThrow(): MetadataVersion = {
+      if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
+        MetadataVersion.fromVersionString(config.interBrokerProtocolVersionString)
+      } else {
+        throw new KafkaException(s"Cannot upgrade from KRaft version prior to 3.3 without first setting ${KafkaConfig.InterBrokerProtocolVersionProp} on each broker.")
+      }
     }
+    val bootstrapMetadata = BootstrapMetadata.load(Paths.get(config.metadataLogDir), asJavaSupplier(() => getUserDefinedIBPVersionOrThrow()))
 
     (metaProperties, bootstrapMetadata, offlineDirs.toSeq)
   }
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index a9ac51315a..826643a0f5 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
+import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
 
 import scala.jdk.CollectionConverters._
@@ -46,13 +47,16 @@ import scala.compat.java8.OptionConverters.RichOptionForJava8
  * @param brokerConfig Broker configuration
  * @param replicaManager A ReplicaManager
  * @param quota The quota, used when building a fetch request
+ * @param metadataVersionSupplier A supplier that returns the current MetadataVersion. This can change during
+ *                                runtime in KRaft mode.
  */
 class RemoteLeaderEndPoint(logPrefix: String,
                            blockingSender: BlockingSend,
                            private[server] val fetchSessionHandler: FetchSessionHandler, // visible for testing
                            brokerConfig: KafkaConfig,
                            replicaManager: ReplicaManager,
-                           quota: ReplicaQuota) extends LeaderEndPoint with Logging {
+                           quota: ReplicaQuota,
+                           metadataVersionSupplier: () => MetadataVersion) extends LeaderEndPoint with Logging {
 
   this.logIdent = logPrefix
 
@@ -61,7 +65,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
 
-  override val isTruncationOnFetchSupported = brokerConfig.interBrokerProtocolVersion.isTruncationOnFetchSupported
+  override def isTruncationOnFetchSupported = metadataVersionSupplier().isTruncationOnFetchSupported
 
   override def initiateClose(): Unit = blockingSender.initiateClose()
 
@@ -106,7 +110,8 @@ class RemoteLeaderEndPoint(logPrefix: String,
           .setPartitionIndex(topicPartition.partition)
           .setCurrentLeaderEpoch(currentLeaderEpoch)
           .setTimestamp(earliestOrLatest)))
-    val requestBuilder = ListOffsetsRequest.Builder.forReplica(brokerConfig.listOffsetRequestVersion, brokerConfig.brokerId)
+    val metadataVersion = metadataVersionSupplier()
+    val requestBuilder = ListOffsetsRequest.Builder.forReplica(metadataVersion.listOffsetRequestVersion, brokerConfig.brokerId)
       .setTargetTimes(Collections.singletonList(topic))
 
     val clientResponse = blockingSender.sendRequest(requestBuilder)
@@ -116,7 +121,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
 
     Errors.forCode(responsePartition.errorCode) match {
       case Errors.NONE =>
-        if (brokerConfig.interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2))
+        if (metadataVersion.isAtLeast(IBP_0_10_1_IV2))
           responsePartition.offset
         else
           responsePartition.oldStyleOffsets.get(0)
@@ -141,7 +146,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
     }
 
     val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(
-      brokerConfig.offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId)
+      metadataVersionSupplier().offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId)
     debug(s"Sending offset for leader epoch request $epochRequest")
 
     try {
@@ -201,7 +206,12 @@ class RemoteLeaderEndPoint(logPrefix: String,
     val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) {
       None
     } else {
-      val version: Short = if (brokerConfig.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) 12 else brokerConfig.fetchRequestVersion
+      val metadataVersion = metadataVersionSupplier()
+      val version: Short = if (metadataVersion.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) {
+        12
+      } else {
+        metadataVersion.fetchRequestVersion
+      }
       val requestBuilder = FetchRequest.Builder
         .forReplica(version, brokerConfig.brokerId, maxWait, minBytes, fetchData.toSend)
         .setMaxBytes(maxBytes)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index feb082c5ae..33af5836cd 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -21,13 +21,15 @@ import kafka.cluster.BrokerEndPoint
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.server.common.MetadataVersion
 
 class ReplicaFetcherManager(brokerConfig: KafkaConfig,
                             protected val replicaManager: ReplicaManager,
                             metrics: Metrics,
                             time: Time,
                             threadNamePrefix: Option[String] = None,
-                            quotaManager: ReplicationQuotaManager)
+                            quotaManager: ReplicationQuotaManager,
+                            metadataVersionSupplier: () => MetadataVersion)
       extends AbstractFetcherManager[ReplicaFetcherThread](
         name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,
         clientId = "Replica",
@@ -41,9 +43,10 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
     val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId,
       s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)
     val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
-    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig, replicaManager, quotaManager)
+    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,
+      replicaManager, quotaManager, metadataVersionSupplier)
     new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
-      quotaManager, logContext.logPrefix)
+      quotaManager, logContext.logPrefix, metadataVersionSupplier)
   }
 
   def shutdown(): Unit = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 86cc6b1b9d..2e728ce817 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -21,6 +21,7 @@ import kafka.log.{LeaderOffsetIncremented, LogAppendInfo}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.server.common.MetadataVersion
 
 class ReplicaFetcherThread(name: String,
                            leader: LeaderEndPoint,
@@ -28,7 +29,8 @@ class ReplicaFetcherThread(name: String,
                            failedPartitions: FailedPartitions,
                            replicaMgr: ReplicaManager,
                            quota: ReplicaQuota,
-                           logPrefix: String)
+                           logPrefix: String,
+                           metadataVersionSupplier: () => MetadataVersion)
   extends AbstractFetcherThread(name = name,
                                 clientId = name,
                                 leader = leader,
@@ -39,7 +41,7 @@ class ReplicaFetcherThread(name: String,
 
   this.logIdent = logPrefix
 
-  override protected val isOffsetForLeaderEpochSupported: Boolean = brokerConfig.interBrokerProtocolVersion.isOffsetForLeaderEpochSupported
+  override protected val isOffsetForLeaderEpochSupported: Boolean = metadataVersionSupplier().isOffsetForLeaderEpochSupported
 
   override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
     replicaMgr.localLogOrException(topicPartition).latestEpoch
@@ -135,7 +137,7 @@ class ReplicaFetcherThread(name: String,
 
   def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
     // oversized messages don't cause replication to fail from fetch request version 3 (KIP-74)
-    if (brokerConfig.fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
+    if (metadataVersionSupplier().fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
       error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " +
         "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
         "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 03983ad98d..aff10ee903 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -310,7 +310,7 @@ class ReplicaManager(val config: KafkaConfig,
     // If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field.
     // In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed.
     // Thus, we choose to halt the broker on any log directory failure if IBP < 1.0
-    val haltBrokerOnFailure = config.interBrokerProtocolVersion.isLessThan(IBP_1_0_IV0)
+    val haltBrokerOnFailure = metadataCache.metadataVersion().isLessThan(IBP_1_0_IV0)
     logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
     logDirFailureHandler.start()
   }
@@ -1773,7 +1773,7 @@ class ReplicaManager(val config: KafkaConfig,
    * OffsetForLeaderEpoch request.
    */
   protected def initialFetchOffset(log: UnifiedLog): Long = {
-    if (config.interBrokerProtocolVersion.isTruncationOnFetchSupported() && log.latestEpoch.nonEmpty)
+    if (metadataCache.metadataVersion().isTruncationOnFetchSupported && log.latestEpoch.nonEmpty)
       log.logEndOffset
     else
       log.highWatermark
@@ -1903,7 +1903,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
-    new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
+    new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager, () => metadataCache.metadataVersion())
   }
 
   protected def createReplicaAlterLogDirsManager(quotaManager: ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 60165a399e..212f188504 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -31,7 +31,6 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.MetadataVersion
 
 import scala.collection.mutable
 
@@ -132,10 +131,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       // Publish the new metadata image to the metadata cache.
       metadataCache.setImage(newImage)
 
-      val metadataVersionLogMsg = newImage.features().metadataVersion() match {
-        case MetadataVersion.UNINITIALIZED => "un-initialized metadata.version"
-        case mv: MetadataVersion => s"metadata.version ${mv.featureLevel()}"
-      }
+      val metadataVersionLogMsg = s"metadata.version ${newImage.features().metadataVersion()}"
 
       if (_firstPublish) {
         info(s"Publishing initial metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index 333af86314..a96275cc27 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -49,7 +49,7 @@ object StorageTool extends Logging {
           val clusterId = namespace.getString("cluster_id")
           val metadataVersion = getMetadataVersion(namespace)
           if (!metadataVersion.isKRaftSupported) {
-            throw new TerseFailure(s"Must specify a metadata version of at least 1.")
+            throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
           }
           val metaProperties = buildMetadataProperties(clusterId, config.get)
           val ignoreFormatted = namespace.getBoolean("ignore_formatted")
@@ -99,7 +99,7 @@ object StorageTool extends Logging {
       action(storeTrue())
     formatParser.addArgument("--release-version", "-r").
       action(store()).
-      help(s"A release version to use for the initial metadata.version. The default is (${MetadataVersion.latest().version()})")
+      help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.latest().version()}")
 
     parser.parseArgsOrFail(args)
   }
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java
index 5830959283..8e9f7de96a 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -122,8 +122,8 @@ public class ClusterConfig {
         return Optional.ofNullable(trustStoreFile);
     }
 
-    public Optional<MetadataVersion> metadataVersion() {
-        return Optional.ofNullable(metadataVersion);
+    public MetadataVersion metadataVersion() {
+        return metadataVersion;
     }
 
     public Properties brokerServerProperties(int brokerId) {
@@ -133,7 +133,7 @@ public class ClusterConfig {
     public Map<String, String> nameTags() {
         Map<String, String> tags = new LinkedHashMap<>(4);
         name().ifPresent(name -> tags.put("Name", name));
-        metadataVersion().ifPresent(mv -> tags.put("MetadataVersion", mv.toString()));
+        tags.put("MetadataVersion", metadataVersion.toString());
         tags.put("Security", securityProtocol.name());
         listenerName().ifPresent(listener -> tags.put("Listener", listener));
         return tags;
@@ -150,11 +150,12 @@ public class ClusterConfig {
     }
 
     public static Builder defaultClusterBuilder() {
-        return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT);
+        return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latest());
     }
 
-    public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
-        return new Builder(type, brokers, controllers, autoStart, securityProtocol);
+    public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart,
+                                         SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
+        return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion);
     }
 
     public static class Builder {
@@ -168,12 +169,13 @@ public class ClusterConfig {
         private File trustStoreFile;
         private MetadataVersion metadataVersion;
 
-        Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
+        Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
             this.type = type;
             this.brokers = brokers;
             this.controllers = controllers;
             this.autoStart = autoStart;
             this.securityProtocol = securityProtocol;
+            this.metadataVersion = metadataVersion;
         }
 
         public Builder type(Type type) {
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 8c18451efd..33780f795e 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -25,6 +25,7 @@ import kafka.test.annotation.ClusterTestDefaults;
 import kafka.test.annotation.ClusterTests;
 import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -109,4 +110,9 @@ public class ClusterTestExtensionsTest {
         clusterInstance.start();
         Assertions.assertNotNull(clusterInstance.anyBrokerSocketServer());
     }
+
+    @ClusterTest
+    public void testDefaults(ClusterConfig config) {
+        Assertions.assertEquals(MetadataVersion.IBP_3_3_IV3, config.metadataVersion());
+    }
 }
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index b83df127f5..d1d3222a25 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -41,6 +41,6 @@ public @interface ClusterTest {
     String name() default "";
     SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
     String listener() default "";
-    MetadataVersion metadataVersion() default MetadataVersion.UNINITIALIZED;
+    MetadataVersion metadataVersion() default MetadataVersion.IBP_3_3_IV3;
     ClusterConfigProperty[] serverProperties() default {};
 }
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
index f0c1d9bbda..bd69109c4b 100644
--- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -25,7 +25,6 @@ import kafka.test.annotation.ClusterTemplate;
 import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.ClusterTests;
 import kafka.test.annotation.Type;
-import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
 import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
@@ -180,7 +179,8 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
                 throw new IllegalStateException();
         }
 
-        ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, annot.securityProtocol());
+        ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart,
+            annot.securityProtocol(), annot.metadataVersion());
         if (!annot.name().isEmpty()) {
             builder.name(annot.name());
         } else {
@@ -195,10 +195,6 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
             properties.put(property.key(), property.value());
         }
 
-        if (!annot.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
-            builder.metadataVersion(annot.metadataVersion());
-        }
-
         ClusterConfig config = builder.build();
         config.serverProperties().putAll(properties);
         type.invocationContexts(config, testInvocations);
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index f0ca98a5f2..73fe67836a 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metadata.BrokerState;
-import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
 import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
 import org.junit.jupiter.api.extension.Extension;
@@ -86,7 +85,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
         return Arrays.asList(
             (BeforeTestExecutionCallback) context -> {
                 TestKitNodes nodes = new TestKitNodes.Builder().
-                        setBootstrapMetadataVersion(clusterConfig.metadataVersion().orElse(MetadataVersion.latest())).
+                        setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
                         setNumBrokerNodes(clusterConfig.numBrokers()).
                         setNumControllerNodes(clusterConfig.numControllers()).build();
                 nodes.brokerNodes().forEach((brokerId, brokerNode) -> {
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 02f21906ed..d8375b0127 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -108,7 +108,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
                     @Override
                     public Properties serverConfig() {
                         Properties props = clusterConfig.serverProperties();
-                        clusterConfig.metadataVersion().ifPresent(mv -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), mv.version()));
+                        props.put(KafkaConfig.InterBrokerProtocolVersionProp(), metadataVersion().version());
                         return props;
                     }
 
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 1924579e17..a930bafde6 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -340,7 +340,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     StorageTool.formatCommand(out,
                             JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
                             properties,
-                            MetadataVersion.IBP_3_0_IV0,
+                            MetadataVersion.MINIMUM_KRAFT_VERSION,
                             false);
                 } finally {
                     for (String line : stream.toString().split(String.format("%n"))) {
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index 3b97ee8398..7d3203e930 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -46,7 +46,7 @@ class ProducerIdsIntegrationTest {
   @ClusterTests(Array(
     new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
     new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
-    new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0)
+    new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV1)
   ))
   def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
     verifyUniqueIds(clusterInstance)
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index b671e5d5e3..c060e3a6da 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters._
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class MetadataVersionIntegrationTest {
   @ClusterTests(value = Array(
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV0),
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV1),
       new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_1_IV0),
       new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_2_IV0)
   ))
@@ -40,8 +40,8 @@ class MetadataVersionIntegrationTest {
     val admin = clusterInstance.createAdminClient()
     val describeResult = admin.describeFeatures()
     val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
-    assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
-    assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
+    assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
+    assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
 
     // Update to new version
     val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
@@ -71,7 +71,8 @@ class MetadataVersionIntegrationTest {
     val admin = clusterInstance.createAdminClient()
     val describeResult = admin.describeFeatures()
     val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
-    assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel())
+    assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel(),
+      "If this test fails, check the default MetadataVersion in the @ClusterTest annotation")
     assertEquals(ff.maxVersionLevel(), MetadataVersion.latest().featureLevel())
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1dd0a9ebc8..ee638ba893 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1570,4 +1570,32 @@ class KafkaConfigTest {
       "contained in listeners or controller.listener.names",
         assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
   }
+
+  @Test
+  def testIgnoreUserInterBrokerProtocolVersionKRaft(): Unit = {
+    for (ibp <- Seq("3.0", "3.1", "3.2")) {
+      val props = new Properties()
+      props.putAll(kraftProps())
+      props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, ibp)
+      val config = new KafkaConfig(props)
+      assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
+    }
+  }
+
+  @Test
+  def testInvalidInterBrokerProtocolVersionKRaft(): Unit = {
+    val props = new Properties()
+    props.putAll(kraftProps())
+    props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "2.8")
+    assertEquals("A non-KRaft version 2.8 given for inter.broker.protocol.version. The minimum version is 3.0-IV1",
+      assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
+  }
+
+  @Test
+  def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
+    val props = new Properties()
+    props.putAll(kraftProps())
+    val config = new KafkaConfig(props)
+    assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index f997455f0b..17483e58a6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -24,6 +24,7 @@ import kafka.log.UnifiedLog
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -71,12 +72,13 @@ class KafkaRaftServerTest {
 
   private def invokeLoadMetaProperties(
     metaProperties: MetaProperties,
-    configProperties: Properties
+    configProperties: Properties,
+    metadataVersion: Option[MetadataVersion] = Some(MetadataVersion.latest())
   ): (MetaProperties, BootstrapMetadata, collection.Seq[String]) = {
     val tempLogDir = TestUtils.tempDirectory()
     try {
       writeMetaProperties(tempLogDir, metaProperties)
-
+      metadataVersion.foreach(mv => writeBootstrapMetadata(tempLogDir, mv))
       configProperties.put(KafkaConfig.LogDirProp, tempLogDir.getAbsolutePath)
       val config = KafkaConfig.fromProps(configProperties)
       KafkaRaftServer.initializeLogDirs(config)
@@ -94,6 +96,11 @@ class KafkaRaftServerTest {
     checkpoint.write(metaProperties.toProperties)
   }
 
+  private def writeBootstrapMetadata(logDir: File, metadataVersion: MetadataVersion): Unit = {
+    val bootstrapMetadata = BootstrapMetadata.create(metadataVersion)
+    BootstrapMetadata.write(bootstrapMetadata, logDir.toPath)
+  }
+
   @Test
   def testStartupFailsIfMetaPropertiesMissingInSomeLogDir(): Unit = {
     val clusterId = clusterIdBase64
@@ -147,6 +154,7 @@ class KafkaRaftServerTest {
     // One log dir is online and has properly formatted `meta.properties`
     val validDir = TestUtils.tempDirectory()
     writeMetaProperties(validDir, MetaProperties(clusterId, nodeId))
+    writeBootstrapMetadata(validDir, MetadataVersion.latest())
 
     // Use a regular file as an invalid log dir to trigger an IO error
     val invalidDir = TestUtils.tempFile("blah")
@@ -215,4 +223,47 @@ class KafkaRaftServerTest {
       () => KafkaRaftServer.initializeLogDirs(config))
   }
 
+  @Test
+  def testKRaftUpdateWithIBP(): Unit = {
+    val clusterId = clusterIdBase64
+    val nodeId = 0
+    val metaProperties = MetaProperties(clusterId, nodeId)
+
+    val configProperties = new Properties
+    configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+    configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+    configProperties.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
+    configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    configProperties.put(KafkaConfig.InterBrokerProtocolVersionProp, "3.2")
+
+    val (loadedMetaProperties, bootstrapMetadata, offlineDirs) =
+      invokeLoadMetaProperties(metaProperties, configProperties, None)
+
+    assertEquals(metaProperties, loadedMetaProperties)
+    assertEquals(Seq.empty, offlineDirs)
+    assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.IBP_3_2_IV0)
+  }
+
+  @Test
+  def testKRaftUpdateWithoutIBP(): Unit = {
+    val clusterId = clusterIdBase64
+    val nodeId = 0
+    val metaProperties = MetaProperties(clusterId, nodeId)
+
+    val logDir = TestUtils.tempDirectory()
+    writeMetaProperties(logDir, metaProperties)
+
+    val configProperties = new Properties
+    configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+    configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+    configProperties.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
+    configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    configProperties.put(KafkaConfig.LogDirProp, logDir.getAbsolutePath)
+
+    val config = KafkaConfig.fromProps(configProperties)
+    assertEquals("Cannot upgrade from KRaft version prior to 3.3 without first setting inter.broker.protocol.version on each broker.",
+      assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config)).getMessage)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index e18dd29d69..c7a222c2d5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -103,14 +103,16 @@ class ReplicaFetcherThreadTest {
                                          leaderEndpointBlockingSend: BlockingSend): ReplicaFetcherThread = {
     val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id}, fetcherId=$fetcherId] ")
     val fetchSessionHandler = new FetchSessionHandler(logContext, leaderEndpointBlockingSend.brokerEndPoint().id)
-    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler, brokerConfig, replicaMgr, quota)
+    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler,
+      brokerConfig, replicaMgr, quota, () => brokerConfig.interBrokerProtocolVersion)
     new ReplicaFetcherThread(name,
       leader,
       brokerConfig,
       failedPartitions,
       replicaMgr,
       quota,
-      logContext.logPrefix)
+      logContext.logPrefix,
+      () => brokerConfig.interBrokerProtocolVersion)
   }
 
   @Test
@@ -121,9 +123,9 @@ class ReplicaFetcherThreadTest {
     val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
     when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats]))
 
-    assertEquals(ApiKeys.FETCH.latestVersion, config.fetchRequestVersion)
-    assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, config.offsetForLeaderEpochRequestVersion)
-    assertEquals(ApiKeys.LIST_OFFSETS.latestVersion, config.listOffsetRequestVersion)
+    assertEquals(ApiKeys.FETCH.latestVersion, config.interBrokerProtocolVersion.fetchRequestVersion())
+    assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, config.interBrokerProtocolVersion.offsetForLeaderEpochRequestVersion)
+    assertEquals(ApiKeys.LIST_OFFSETS.latestVersion, config.interBrokerProtocolVersion.listOffsetRequestVersion)
   }
 
   @Test
@@ -581,8 +583,10 @@ class ReplicaFetcherThreadTest {
     val mockNetwork = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint, new SystemTime())
     val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
     val fetchSessionHandler = new FetchSessionHandler(logContext, brokerEndPoint.id)
-    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, fetchSessionHandler, config, replicaManager, quota)
-    val thread = new ReplicaFetcherThread("bob", leader, config, failedPartitions, replicaManager, quota, logContext.logPrefix) {
+    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, fetchSessionHandler, config,
+      replicaManager, quota, () => config.interBrokerProtocolVersion)
+    val thread = new ReplicaFetcherThread("bob", leader, config, failedPartitions,
+      replicaManager, quota, logContext.logPrefix, () => config.interBrokerProtocolVersion) {
       override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = None
     }
     thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), initialLEO), t1p1 -> initialFetchState(Some(topicId1), initialLEO)))
@@ -1036,14 +1040,16 @@ class ReplicaFetcherThreadTest {
 
     val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
     val fetchSessionHandler = new FetchSessionHandler(logContext, brokerEndPoint.id)
-    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockBlockingSend, fetchSessionHandler, config, replicaManager, replicaQuota)
+    val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockBlockingSend, fetchSessionHandler, config,
+      replicaManager, replicaQuota, () => config.interBrokerProtocolVersion)
     val thread = new ReplicaFetcherThread("bob",
       leader,
       config,
       failedPartitions,
       replicaManager,
       replicaQuota,
-      logContext.logPrefix)
+      logContext.logPrefix,
+      () => config.interBrokerProtocolVersion)
 
     val leaderEpoch = 1
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c2c8b4ddc7..ab0649a6af 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -214,6 +214,7 @@ class ReplicaManagerTest {
     val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
     val metadataCache: MetadataCache = mock(classOf[MetadataCache])
     mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
+    when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
     val rm = new ReplicaManager(
       metrics = metrics,
       config = config,
@@ -1964,7 +1965,7 @@ class ReplicaManagerTest {
       any[TopicPartition], any[ListenerName])).
         thenReturn(Map(leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"),
           followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap)
-
+    when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
     val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
       purgatoryName = "Produce", timer, reaperEnabled = false)
     val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
@@ -2003,15 +2004,16 @@ class ReplicaManagerTest {
                                                          time: Time,
                                                          threadNamePrefix: Option[String],
                                                          replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
-        new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager) {
+        new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager, () => metadataCache.metadataVersion()) {
 
           override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
             val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${sourceBroker.id}, " +
               s"fetcherId=$fetcherId] ")
             val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
-            val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config, replicaManager, quotaManager.follower)
+            val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config,
+              replicaManager, quotaManager.follower, () => config.interBrokerProtocolVersion)
             new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, config, failedPartitions, replicaManager,
-              quotaManager.follower, logContext.logPrefix) {
+              quotaManager.follower, logContext.logPrefix, () => config.interBrokerProtocolVersion) {
               override def doWork(): Unit = {
                 // In case the thread starts before the partition is added by AbstractFetcherManager,
                 // add it here (it's a no-op if already added)
@@ -2235,6 +2237,7 @@ class ReplicaManagerTest {
     when(metadataCache.topicIdInfo()).thenReturn((topicIds.asJava, topicNames.asJava))
     when(metadataCache.topicNamesToIds()).thenReturn(topicIds.asJava)
     when(metadataCache.topicIdsToNames()).thenReturn(topicNames.asJava)
+    when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
     mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
     val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
       purgatoryName = "Produce", timer, reaperEnabled = false)
@@ -2487,6 +2490,8 @@ class ReplicaManagerTest {
     val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
     mockGetAliveBrokerFunctions(metadataCache0, aliveBrokers)
     mockGetAliveBrokerFunctions(metadataCache1, aliveBrokers)
+    when(metadataCache0.metadataVersion()).thenReturn(config0.interBrokerProtocolVersion)
+    when(metadataCache1.metadataVersion()).thenReturn(config1.interBrokerProtocolVersion)
 
     // each replica manager is for a broker
     val rm0 = new ReplicaManager(
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 948e051337..f75823a029 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -74,7 +74,7 @@ class BrokerMetadataListenerTest {
         )
       )
       val imageRecords = listener.getImageRecords().get()
-      assertEquals(0, imageRecords.size())
+      assertEquals(1, imageRecords.size())
       assertEquals(100L, listener.highestMetadataOffset)
       assertEquals(0L, metrics.lastAppliedRecordOffset.get)
       assertEquals(0L, metrics.lastAppliedRecordTimestamp.get)
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 86400ba1e9..b0f36522f3 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -325,7 +325,8 @@ public class ReplicaFetcherThreadBenchmark {
                                     new LogContext(String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)), 3),
                             config,
                             replicaManager,
-                            replicaQuota
+                            replicaQuota,
+                            config::interBrokerProtocolVersion
                     ) {
                         @Override
                         public long fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
@@ -356,7 +357,8 @@ public class ReplicaFetcherThreadBenchmark {
                     new FailedPartitions(),
                     replicaManager,
                     replicaQuota,
-                    String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)
+                    String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3),
+                    config::interBrokerProtocolVersion
             );
 
             pool = partitions;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
index fa031c525f..d9d0651a19 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 
@@ -128,7 +129,9 @@ public class BootstrapMetadata {
 
     public static BootstrapMetadata create(MetadataVersion metadataVersion, List<ApiMessageAndVersion> records) {
         if (!metadataVersion.isKRaftSupported()) {
-            throw new IllegalArgumentException("Cannot create BootstrapMetadata with a non-KRaft metadata version.");
+            throw new IllegalArgumentException(String.format(
+                "Cannot create BootstrapMetadata with a non-KRaft metadata version %s. Minimum version is %s",
+                metadataVersion, MetadataVersion.MINIMUM_KRAFT_VERSION));
         }
         records.add(new ApiMessageAndVersion(
             new FeatureLevelRecord()
@@ -142,18 +145,24 @@ public class BootstrapMetadata {
     /**
      * Load a bootstrap snapshot into a read-only bootstrap metadata object and return it.
      *
-     * @param bootstrapDir  The directory from which to read the snapshot file.
-     * @param fallbackPreviewVersion    The metadata.version to boostrap if upgrading from KRaft preview
-     * @return              The read-only bootstrap metadata
+     * @param bootstrapDir              The directory from which to read the snapshot file.
+     * @param fallbackVersionSupplier   A function that returns the metadata.version to use when upgrading from an older KRaft
+     * @return                          The read-only bootstrap metadata
      * @throws Exception
      */
-    public static BootstrapMetadata load(Path bootstrapDir, MetadataVersion fallbackPreviewVersion) throws Exception {
+    public static BootstrapMetadata load(Path bootstrapDir, Supplier<MetadataVersion> fallbackVersionSupplier) throws Exception {
         final Path bootstrapPath = bootstrapDir.resolve(BOOTSTRAP_FILE);
 
         if (!Files.exists(bootstrapPath)) {
-            log.debug("Missing bootstrap file, this appears to be a KRaft preview cluster. Setting metadata.version to {}.",
-                fallbackPreviewVersion.featureLevel());
-            return BootstrapMetadata.create(fallbackPreviewVersion);
+            // Upgrade scenario from KRaft prior to 3.3 (i.e., no bootstrap metadata present)
+            MetadataVersion fallbackVersion = fallbackVersionSupplier.get();
+            if (fallbackVersion.isKRaftSupported()) {
+                log.debug("Missing bootstrap file, this appears to be a KRaft cluster older than 3.3. Setting metadata.version to {}.",
+                    fallbackVersion.featureLevel());
+                return BootstrapMetadata.create(fallbackVersion);
+            } else {
+                throw new Exception(String.format("Could not set fallback bootstrap metadata with non-KRaft metadata version of %s", fallbackVersion));
+            }
         }
 
         BootstrapListener listener = new BootstrapListener();
@@ -182,7 +191,7 @@ public class BootstrapMetadata {
         if (metadataVersionRecord.isPresent()) {
             return new BootstrapMetadata(MetadataVersion.fromFeatureLevel(metadataVersionRecord.get().featureLevel()), listener.records);
         } else {
-            throw new RuntimeException("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
+            throw new Exception("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 21bd01c3b9..859dedda22 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -590,11 +590,7 @@ public class ClusterControlManager {
 
         ClusterControlIterator(long epoch) {
             this.iterator = brokerRegistrations.entrySet(epoch).iterator();
-            if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
-                this.metadataVersion = MetadataVersion.IBP_3_0_IV1;
-            } else {
-                this.metadataVersion = featureControl.metadataVersion();
-            }
+            this.metadataVersion = featureControl.metadataVersion();
         }
 
         @Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 19127afa72..c092abcdcc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -52,7 +52,7 @@ public class FeatureControlManager {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
         private QuorumFeatures quorumFeatures = null;
-        private MetadataVersion metadataVersion = MetadataVersion.UNINITIALIZED;
+        private MetadataVersion metadataVersion = MetadataVersion.MINIMUM_KRAFT_VERSION;
 
         Builder setLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -105,6 +105,10 @@ public class FeatureControlManager {
      */
     private final TimelineObject<MetadataVersion> metadataVersion;
 
+    /**
+     * A boolean to see if we have encountered a metadata.version or not.
+     */
+    private final TimelineObject<Boolean> sawMetadataVersion;
 
     private FeatureControlManager(
         LogContext logContext,
@@ -116,6 +120,7 @@ public class FeatureControlManager {
         this.quorumFeatures = quorumFeatures;
         this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
         this.metadataVersion = new TimelineObject<>(snapshotRegistry, metadataVersion);
+        this.sawMetadataVersion = new TimelineObject<>(snapshotRegistry, false);
     }
 
     ControllerResult<Map<String, ApiError>> updateFeatures(
@@ -226,7 +231,7 @@ public class FeatureControlManager {
             return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
         }
 
-        if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && newVersion.isLessThan(currentVersion)) {
+        if (newVersion.isLessThan(currentVersion)) {
             // This is a downgrade
             boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
             if (!metadataChanged) {
@@ -257,15 +262,20 @@ public class FeatureControlManager {
 
     FinalizedControllerFeatures finalizedFeatures(long epoch) {
         Map<String, Short> features = new HashMap<>();
-        if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) {
-            features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel());
-        }
+        features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel());
         for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
             features.put(entry.getKey(), entry.getValue());
         }
         return new FinalizedControllerFeatures(features, epoch);
     }
 
+    /**
+     * @return true if a FeatureLevelRecord for "metadata.version" has been replayed. False otherwise
+     */
+    boolean sawMetadataVersion() {
+        return this.sawMetadataVersion.get();
+    }
+
     public void replay(FeatureLevelRecord record) {
         VersionRange range = quorumFeatures.localSupportedFeature(record.name());
         if (!range.contains(record.featureLevel())) {
@@ -275,6 +285,7 @@ public class FeatureControlManager {
         if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
             log.info("Setting metadata.version to {}", record.featureLevel());
             metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
+            sawMetadataVersion.set(true);
         } else {
             if (record.featureLevel() == 0) {
                 log.info("Removing feature {}", record.name());
@@ -294,9 +305,6 @@ public class FeatureControlManager {
         FeatureControlIterator(long epoch) {
             this.iterator = finalizedVersions.entrySet(epoch).iterator();
             this.metadataVersion = FeatureControlManager.this.metadataVersion.get(epoch);
-            if (this.metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
-                this.wroteVersion = true;
-            }
         }
 
         @Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 4a8c8c9661..88146766e3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -283,7 +283,7 @@ public final class QuorumController implements Controller {
         public QuorumController build() throws Exception {
             if (raftClient == null) {
                 throw new IllegalStateException("You must set a raft client.");
-            } else if (bootstrapMetadata == null || bootstrapMetadata.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+            } else if (bootstrapMetadata == null) {
                 throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
             } else if (quorumFeatures == null) {
                 throw new IllegalStateException("You must specify the quorum features");
@@ -932,21 +932,21 @@ public final class QuorumController implements Controller {
                     // write any other records to the log since we need the metadata.version to determine the correct
                     // record version
                     final MetadataVersion metadataVersion;
-                    if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+                    if (!featureControl.sawMetadataVersion()) {
                         final CompletableFuture<Map<String, ApiError>> future;
                         if (!bootstrapMetadata.metadataVersion().isKRaftSupported()) {
-                            metadataVersion = MetadataVersion.UNINITIALIZED;
+                            metadataVersion = MetadataVersion.MINIMUM_KRAFT_VERSION;
                             future = new CompletableFuture<>();
                             future.completeExceptionally(
-                                new IllegalStateException("Cannot become leader without an initial metadata.version of " +
-                                    "at least 1. Got " + bootstrapMetadata.metadataVersion().featureLevel()));
+                                new IllegalStateException("Cannot become leader without a KRaft supported version. " +
+                                    "Got " + bootstrapMetadata.metadataVersion()));
                         } else {
                             metadataVersion = bootstrapMetadata.metadataVersion();
                             future = appendWriteEvent("bootstrapMetadata", OptionalLong.empty(), () -> {
                                 if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
                                     log.info("Initializing metadata.version to {}", metadataVersion.featureLevel());
                                 } else {
-                                    log.info("Upgrading from KRaft preview. Initializing metadata.version to {}",
+                                    log.info("Upgrading KRaft cluster and initializing metadata.version to {}",
                                         metadataVersion.featureLevel());
                                 }
                                 return ControllerResult.atomicOf(bootstrapMetadata.records(), null);
@@ -1964,6 +1964,11 @@ public final class QuorumController implements Controller {
         return curClaimEpoch;
     }
 
+    // Visible for testing
+    MetadataVersion metadataVersion() {
+        return featureControl.metadataVersion();
+    }
+
     @Override
     public void close() throws InterruptedException {
         queue.close();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 9b723515bd..36725c2518 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -72,7 +72,9 @@ public class QuorumFeatures {
 
     public static Map<String, VersionRange> defaultFeatureMap() {
         Map<String, VersionRange> features = new HashMap<>(1);
-        features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()));
+        features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
+            MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
+            MetadataVersion.latest().featureLevel()));
         return features;
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
index 3843b59bc7..4cfb1260f1 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
@@ -38,7 +38,7 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
  * This class is thread-safe.
  */
 public final class FeaturesImage {
-    public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.UNINITIALIZED);
+    public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.MINIMUM_KRAFT_VERSION);
 
     private final Map<String, Short> finalizedVersions;
 
@@ -68,11 +68,10 @@ public final class FeaturesImage {
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
         List<ApiMessageAndVersion> batch = new ArrayList<>();
         // Write out the metadata.version record first, and then the rest of the finalized features
-        if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
-            batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-                setName(MetadataVersion.FEATURE_NAME).
-                setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
-        }
+        batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+
         for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
             if (entry.getKey().equals(MetadataVersion.FEATURE_NAME)) {
                 continue;
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index e3cd94a0cb..55d572127e 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -121,12 +121,7 @@ public final class MetadataImage {
     }
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
-        // We use the minimum KRaft metadata version if this image does
-        // not have a specific version set.
         MetadataVersion metadataVersion = features.metadataVersion();
-        if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
-            metadataVersion = MetadataVersion.IBP_3_0_IV1;
-        }
         // Features should be written out first so we can include the metadata.version at the beginning of the
         // snapshot
         features.write(out);
diff --git a/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java b/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
index 16e03a0501..4eb5d47478 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
@@ -34,28 +34,28 @@ public class BootstrapMetadataTest {
     @Test
     public void testWriteAndReadBootstrapFile() throws Exception {
         Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
-        BootstrapMetadata metadata = BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0);
+        BootstrapMetadata metadata = BootstrapMetadata.create(MetadataVersion.MINIMUM_KRAFT_VERSION);
         BootstrapMetadata.write(metadata, tmpDir);
 
         assertTrue(Files.exists(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE)));
 
-        BootstrapMetadata newMetadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
+        BootstrapMetadata newMetadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION);
         assertEquals(metadata, newMetadata);
     }
 
     @Test
     public void testNoBootstrapFile() throws Exception {
         Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
-        BootstrapMetadata metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
-        assertEquals(MetadataVersion.IBP_3_0_IV0, metadata.metadataVersion());
-        metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_2_IV0);
+        BootstrapMetadata metadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION);
+        assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, metadata.metadataVersion());
+        metadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.IBP_3_2_IV0);
         assertEquals(MetadataVersion.IBP_3_2_IV0, metadata.metadataVersion());
     }
 
     @Test
     public void testExistingBootstrapFile() throws Exception {
         Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
-        BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0), tmpDir);
+        BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.MINIMUM_KRAFT_VERSION), tmpDir);
         assertThrows(IOException.class, () -> {
             BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_1_IV0), tmpDir);
         });
@@ -65,7 +65,7 @@ public class BootstrapMetadataTest {
     public void testEmptyBootstrapFile() throws Exception {
         Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
         Files.createFile(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE));
-        assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
+        assertThrows(Exception.class, () -> BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION),
             "Should fail to load if no metadata.version is set");
     }
 
@@ -77,7 +77,7 @@ public class BootstrapMetadataTest {
         byte[] data = new byte[100];
         random.nextBytes(data);
         Files.write(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE), data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
-        assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
+        assertThrows(Exception.class, () -> BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION),
             "Should fail on invalid data");
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index cd44e2e678..c0b163180e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -68,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Timeout(value = 40)
 public class ClusterControlManagerTest {
     @ParameterizedTest
-    @EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV0", "IBP_3_3_IV2"})
+    @EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV1", "IBP_3_3_IV2"})
     public void testReplay(MetadataVersion metadataVersion) {
         MockTime time = new MockTime(0, 0, 0);
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index fd1e5af39e..4d4c471994 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -93,7 +93,7 @@ public class FeatureControlManagerTest {
             setSnapshotRegistry(snapshotRegistry).
             build();
         snapshotRegistry.getOrCreateSnapshot(-1);
-        assertEquals(new FinalizedControllerFeatures(Collections.emptyMap(), -1),
+        assertEquals(new FinalizedControllerFeatures(Collections.singletonMap("metadata.version", (short) 1), -1),
             manager.finalizedFeatures(-1));
         assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
@@ -131,7 +131,7 @@ public class FeatureControlManagerTest {
                 build();
         manager.replay(record);
         snapshotRegistry.getOrCreateSnapshot(123);
-        assertEquals(new FinalizedControllerFeatures(versionMap("foo", 2), 123),
+        assertEquals(new FinalizedControllerFeatures(versionMap("metadata.version", 1, "foo", 2), 123),
             manager.finalizedFeatures(123));
     }
 
@@ -210,6 +210,9 @@ public class FeatureControlManagerTest {
                 Collections.emptyMap(), Collections.emptyMap(), false);
         RecordTestUtils.replayAll(manager, result.records());
         RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
+            Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
+                    setName("metadata.version").
+                    setFeatureLevel((short) 1), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName("foo").
                 setFeatureLevel((short) 5), (short) 0)),
@@ -222,13 +225,13 @@ public class FeatureControlManagerTest {
     @Test
     public void testApplyMetadataVersionChangeRecord() {
         QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
-                MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
+                MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
         FeatureControlManager manager = new FeatureControlManager.Builder().
             setQuorumFeatures(features).build();
         manager.replay(new FeatureLevelRecord().
             setName(MetadataVersion.FEATURE_NAME).
-            setFeatureLevel(MetadataVersion.IBP_3_0_IV0.featureLevel()));
-        assertEquals(MetadataVersion.IBP_3_0_IV0, manager.metadataVersion());
+            setFeatureLevel(MetadataVersion.IBP_3_0_IV1.featureLevel()));
+        assertEquals(MetadataVersion.IBP_3_0_IV1, manager.metadataVersion());
     }
 
     @Test
@@ -258,12 +261,12 @@ public class FeatureControlManagerTest {
         assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
 
         result = manager.updateFeatures(
-                Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV0.featureLevel()),
+                Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()),
                 Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
                 Collections.emptyMap(),
                 true);
         assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
-        assertEquals("Invalid update version 1 for feature metadata.version. Local controller 0 only supports versions 4-5",
+        assertEquals("Invalid update version 1 for feature metadata.version. Local controller 0 only supports versions 3-4",
             result.response().get(MetadataVersion.FEATURE_NAME).message());
     }
 
@@ -271,7 +274,7 @@ public class FeatureControlManagerTest {
     public void testCreateFeatureLevelRecords() {
         Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
         localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
-                MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()));
+                MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latest().featureLevel()));
         localSupportedFeatures.put("foo", VersionRange.of(0, 2));
         FeatureControlManager manager = new FeatureControlManager.Builder().
                 setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(), localSupportedFeatures, emptyList())).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 87afbf4199..1729d95312 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -96,6 +96,7 @@ import org.apache.kafka.snapshot.RecordsSnapshotReader;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.mockito.Mockito;
 
 import static java.util.function.Function.identity;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -214,7 +215,7 @@ public class QuorumControllerTest {
             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
                 b.setConfigSchema(SCHEMA);
-            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty(), MetadataVersion.latest());
+            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty(), BootstrapMetadata.create(MetadataVersion.latest()));
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -306,7 +307,7 @@ public class QuorumControllerTest {
             LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
                 b.setConfigSchema(SCHEMA);
-            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs), MetadataVersion.latest());
+            }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs), BootstrapMetadata.create(MetadataVersion.latest()));
         ) {
             ListenerCollection listeners = new ListenerCollection();
             listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -537,7 +538,7 @@ public class QuorumControllerTest {
         BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
         features.add(new BrokerRegistrationRequestData.Feature()
             .setName(MetadataVersion.FEATURE_NAME)
-            .setMinSupportedVersion(MetadataVersion.IBP_3_0_IV0.featureLevel())
+            .setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel())
             .setMaxSupportedVersion(MetadataVersion.latest().featureLevel()));
         return features;
     }
@@ -1179,4 +1180,21 @@ public class QuorumControllerTest {
                 "authorizer " + i + " should not have completed loading.");
         }
     }
+
+    @Test
+    public void testInvalidBootstrapMetadata() throws Exception {
+        // We can't actually create a BootstrapMetadata with an invalid version, so we have to fake it
+        BootstrapMetadata bootstrapMetadata = Mockito.mock(BootstrapMetadata.class);
+        Mockito.when(bootstrapMetadata.metadataVersion()).thenReturn(MetadataVersion.IBP_2_8_IV0);
+        try (
+                LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
+                QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
+                    b.setConfigSchema(SCHEMA);
+                }, OptionalLong.empty(), OptionalLong.empty(), bootstrapMetadata);
+        ) {
+            QuorumController active = controlEnv.activeController();
+            TestUtils.waitForCondition(() -> !active.isActive(),
+                "Timed out waiting for controller to renounce itself after bad bootstrap metadata version.");
+        }
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index e1481b9fe5..68b11ae3ec 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -49,7 +49,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         LocalLogManagerTestEnv logEnv,
         Consumer<QuorumController.Builder> builderConsumer
     ) throws Exception {
-        this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), MetadataVersion.latest());
+        this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), BootstrapMetadata.create(MetadataVersion.latest()));
     }
 
     public QuorumControllerTestEnv(
@@ -57,7 +57,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         Consumer<Builder> builderConsumer,
         OptionalLong sessionTimeoutMillis,
         OptionalLong leaderImbalanceCheckIntervalNs,
-        MetadataVersion metadataVersion
+        BootstrapMetadata bootstrapMetadata
     ) throws Exception {
         this.logEnv = logEnv;
         int numControllers = logEnv.logManagers().size();
@@ -68,7 +68,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
             for (int i = 0; i < numControllers; i++) {
                 QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
                 builder.setRaftClient(logEnv.logManagers().get(i));
-                builder.setBootstrapMetadata(BootstrapMetadata.create(metadataVersion));
+                builder.setBootstrapMetadata(bootstrapMetadata);
                 builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
                 builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
                 sessionTimeoutMillis.ifPresent(timeout -> {
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index aa1a416c29..55916470cb 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -43,7 +43,6 @@ import org.apache.kafka.common.record.RecordVersion;
  * released version, they can use "0.10.0" when upgrading to the 0.10.0 release.
  */
 public enum MetadataVersion {
-    UNINITIALIZED(-1, "0.0", ""),
 
     IBP_0_8_0(-1, "0.8.0", ""),
     IBP_0_8_1(-1, "0.8.1", ""),
@@ -140,32 +139,36 @@ public enum MetadataVersion {
     IBP_2_8_IV1(-1, "2.8", "IV1"),
 
     // Introduce AllocateProducerIds (KIP-730)
-    IBP_3_0_IV0(1, "3.0", "IV0", true),
+    IBP_3_0_IV0(-1, "3.0", "IV0"),
 
     // Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)
     // Assume message format version is 3.0 (KIP-724)
-    IBP_3_0_IV1(2, "3.0", "IV1", false),
+    IBP_3_0_IV1(1, "3.0", "IV1", true),
 
     // Adds topic IDs to Fetch requests/responses (KIP-516)
-    IBP_3_1_IV0(3, "3.1", "IV0", false),
+    IBP_3_1_IV0(2, "3.1", "IV0", false),
 
     // Support for leader recovery for unclean leader election (KIP-704)
-    IBP_3_2_IV0(4, "3.2", "IV0", true),
+    IBP_3_2_IV0(3, "3.2", "IV0", true),
 
     // Support for metadata.version feature flag and Removes min_version_level from the finalized version range that is written to ZooKeeper (KIP-778)
-    IBP_3_3_IV0(5, "3.3", "IV0", false),
+    IBP_3_3_IV0(4, "3.3", "IV0", false),
 
     // Support NoopRecord for the cluster metadata log (KIP-835)
-    IBP_3_3_IV1(6, "3.3", "IV1", true),
+    IBP_3_3_IV1(5, "3.3", "IV1", true),
 
     // In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord.
-    IBP_3_3_IV2(7, "3.3", "IV2", true),
+    IBP_3_3_IV2(6, "3.3", "IV2", true),
 
     // Adds InControlledShutdown state to RegisterBrokerRecord and BrokerRegistrationChangeRecord (KIP-841).
-    IBP_3_3_IV3(8, "3.3", "IV3", true);
+    IBP_3_3_IV3(7, "3.3", "IV3", true);
 
+    // NOTE: update the default version in @ClusterTest annotation to point to the latest version
+    
     public static final String FEATURE_NAME = "metadata.version";
 
+    public static final MetadataVersion MINIMUM_KRAFT_VERSION = IBP_3_0_IV1;
+
     public static final MetadataVersion[] VERSIONS;
 
     private final short featureLevel;
@@ -258,12 +261,73 @@ public enum MetadataVersion {
         }
     }
 
+    public short fetchRequestVersion() {
+        if (this.isAtLeast(IBP_3_1_IV0)) {
+            return 13;
+        } else if (this.isAtLeast(IBP_2_7_IV1)) {
+            return 12;
+        } else if (this.isAtLeast(IBP_2_3_IV1)) {
+            return 11;
+        } else if (this.isAtLeast(IBP_2_1_IV2)) {
+            return 10;
+        } else if (this.isAtLeast(IBP_2_0_IV1)) {
+            return 8;
+        } else if (this.isAtLeast(IBP_1_1_IV0)) {
+            return 7;
+        } else if (this.isAtLeast(IBP_0_11_0_IV1)) {
+            return 5;
+        } else if (this.isAtLeast(IBP_0_11_0_IV0)) {
+            return 4;
+        } else if (this.isAtLeast(IBP_0_10_1_IV1)) {
+            return 3;
+        } else if (this.isAtLeast(IBP_0_10_0_IV0)) {
+            return 2;
+        } else if (this.isAtLeast(IBP_0_9_0)) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    public short offsetForLeaderEpochRequestVersion() {
+        if (this.isAtLeast(IBP_2_8_IV0)) {
+            return 4;
+        } else if (this.isAtLeast(IBP_2_3_IV1)) {
+            return 3;
+        } else if (this.isAtLeast(IBP_2_1_IV1)) {
+            return 2;
+        } else if (this.isAtLeast(IBP_2_0_IV0)) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    public short listOffsetRequestVersion() {
+        if (this.isAtLeast(IBP_3_0_IV1)) {
+            return 7;
+        } else if (this.isAtLeast(IBP_2_8_IV0)) {
+            return 6;
+        } else if (this.isAtLeast(IBP_2_2_IV1)) {
+            return 5;
+        } else if (this.isAtLeast(IBP_2_1_IV1)) {
+            return 4;
+        } else if (this.isAtLeast(IBP_2_0_IV1)) {
+            return 3;
+        } else if (this.isAtLeast(IBP_0_11_0_IV0)) {
+            return 2;
+        } else if (this.isAtLeast(IBP_0_10_1_IV2)) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
     private static final Map<String, MetadataVersion> IBP_VERSIONS;
     static {
         {
-            // Make a copy of values() and omit UNINITIALIZED
             MetadataVersion[] enumValues = MetadataVersion.values();
-            VERSIONS = Arrays.copyOfRange(enumValues, 1, enumValues.length);
+            VERSIONS = Arrays.copyOf(enumValues, enumValues.length);
 
             IBP_VERSIONS = new HashMap<>();
             Map<String, MetadataVersion> maxInterVersion = new HashMap<>();
@@ -289,8 +353,8 @@ public enum MetadataVersion {
 
     Optional<MetadataVersion> previous() {
         int idx = this.ordinal();
-        if (idx > 1) {
-            return Optional.of(VERSIONS[idx - 2]);
+        if (idx > 0) {
+            return Optional.of(VERSIONS[idx - 1]);
         } else {
             return Optional.empty();
         }
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 2f96d5fd04..99f9cc3515 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -76,7 +76,7 @@ class MetadataVersionTest {
     @Test
     public void testFeatureLevel() {
         MetadataVersion[] metadataVersions = MetadataVersion.VERSIONS;
-        int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(IBP_3_0_IV0);
+        int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(MetadataVersion.MINIMUM_KRAFT_VERSION);
         for (int i = 0; i < firstFeatureLevelIndex; i++) {
             assertTrue(metadataVersions[i].featureLevel() < 0);
         }
@@ -287,7 +287,7 @@ class MetadataVersionTest {
     public void testPrevious() {
         for (int i = 1; i < MetadataVersion.VERSIONS.length - 2; i++) {
             MetadataVersion version = MetadataVersion.VERSIONS[i];
-            assertTrue(version.previous().isPresent());
+            assertTrue(version.previous().isPresent(), version.toString());
             assertEquals(MetadataVersion.VERSIONS[i - 1], version.previous().get());
         }
     }
@@ -317,8 +317,8 @@ class MetadataVersionTest {
         }
 
         for (MetadataVersion metadataVersion : MetadataVersion.VERSIONS) {
-            if (metadataVersion.isAtLeast(IBP_3_0_IV0)) {
-                assertTrue(metadataVersion.isKRaftSupported());
+            if (metadataVersion.isAtLeast(IBP_3_0_IV1)) {
+                assertTrue(metadataVersion.isKRaftSupported(), metadataVersion.toString());
             } else {
                 assertFalse(metadataVersion.isKRaftSupported());
             }