You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/05/25 18:02:52 UTC

[kafka] branch trunk updated: KAFKA-13833: Remove the min_version_level from the finalized version range written to ZooKeeper (#12062)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 54d60ced86 KAFKA-13833: Remove the min_version_level from the finalized version range written to ZooKeeper (#12062)
54d60ced86 is described below

commit 54d60ced869cbc31776f755c7f464ae2d2ee3e95
Author: dengziming <de...@gmail.com>
AuthorDate: Thu May 26 02:02:34 2022 +0800

    KAFKA-13833: Remove the min_version_level from the finalized version range written to ZooKeeper (#12062)
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../org/apache/kafka/common/feature/Features.java  |  28 ----
 .../common/feature/FinalizedVersionRange.java      |  53 --------
 .../common/feature/SupportedVersionRange.java      |  12 ++
 .../kafka/common/requests/ApiVersionsResponse.java |  23 ++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  17 +--
 .../apache/kafka/common/feature/FeaturesTest.java  |  54 +-------
 .../common/feature/FinalizedVersionRangeTest.java  |  80 ------------
 .../common/feature/SupportedVersionRangeTest.java  |  18 ++-
 .../common/requests/ApiVersionsResponseTest.java   |  13 +-
 .../scala/kafka/controller/KafkaController.scala   | 115 ++++++++---------
 .../scala/kafka/server/ApiVersionManager.scala     |   6 +-
 .../main/scala/kafka/server/BrokerFeatures.scala   |  29 ++---
 .../scala/kafka/server/FinalizedFeatureCache.scala |  27 ++--
 .../server/FinalizedFeatureChangeListener.scala    |   2 +-
 core/src/main/scala/kafka/zk/ZkData.scala          |  95 +++++++++-----
 .../src/test/scala/kafka/zk/FeatureZNodeTest.scala |  95 +++++++++++---
 .../controller/ControllerIntegrationTest.scala     |  20 ++-
 .../unit/kafka/server/BrokerFeaturesTest.scala     |  48 ++++---
 .../kafka/server/FinalizedFeatureCacheTest.scala   |  18 +--
 .../FinalizedFeatureChangeListenerTest.scala       |  60 ++++-----
 .../unit/kafka/server/UpdateFeaturesTest.scala     | 141 ++++++++++-----------
 .../kafka/server/common/MetadataVersion.java       |   2 +-
 .../kafka/server/common/MetadataVersionTest.java   |   7 +-
 23 files changed, 404 insertions(+), 559 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/feature/Features.java b/clients/src/main/java/org/apache/kafka/common/feature/Features.java
index 4006d71947..a39ee37e53 100644
--- a/clients/src/main/java/org/apache/kafka/common/feature/Features.java
+++ b/clients/src/main/java/org/apache/kafka/common/feature/Features.java
@@ -32,7 +32,6 @@ import static java.util.stream.Collectors.joining;
  *
  * @param <VersionRangeType> is the type of version range.
  * @see SupportedVersionRange
- * @see FinalizedVersionRange
  */
 public class Features<VersionRangeType extends BaseVersionRange> {
     private final Map<String, VersionRangeType> features;
@@ -57,20 +56,6 @@ public class Features<VersionRangeType extends BaseVersionRange> {
         return new Features<>(features);
     }
 
-    /**
-     * @param features   Map of feature name to FinalizedVersionRange.
-     *
-     * @return           Returns a new Features object representing finalized features.
-     */
-    public static Features<FinalizedVersionRange> finalizedFeatures(Map<String, FinalizedVersionRange> features) {
-        return new Features<>(features);
-    }
-
-    // Visible for testing.
-    public static Features<FinalizedVersionRange> emptyFinalizedFeatures() {
-        return new Features<>(new HashMap<>());
-    }
-
     public static Features<SupportedVersionRange> emptySupportedFeatures() {
         return new Features<>(new HashMap<>());
     }
@@ -138,19 +123,6 @@ public class Features<VersionRangeType extends BaseVersionRange> {
                 entry -> converter.fromMap(entry.getValue()))));
     }
 
-    /**
-     * Converts from a map to Features<FinalizedVersionRange>.
-     *
-     * @param featuresMap  the map representation of a Features<FinalizedVersionRange> object,
-     *                     generated using the toMap() API.
-     *
-     * @return             the Features<FinalizedVersionRange> object
-     */
-    public static Features<FinalizedVersionRange> fromFinalizedFeaturesMap(
-        Map<String, Map<String, Short>> featuresMap) {
-        return fromFeaturesMap(featuresMap, FinalizedVersionRange::fromMap);
-    }
-
     /**
      * Converts from a map to Features<SupportedVersionRange>.
      *
diff --git a/clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java b/clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
deleted file mode 100644
index 27e6440478..0000000000
--- a/clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.feature;
-
-import java.util.Map;
-
-/**
- * An extended {@link BaseVersionRange} representing the min/max versions for a finalized feature.
- */
-public class FinalizedVersionRange extends BaseVersionRange {
-    // Label for the min version key, that's used only to convert to/from a map.
-    private static final String MIN_VERSION_LEVEL_KEY_LABEL = "min_version_level";
-
-    // Label for the max version key, that's used only to convert to/from a map.
-    private static final String MAX_VERSION_LEVEL_KEY_LABEL = "max_version_level";
-
-    public FinalizedVersionRange(short minVersionLevel, short maxVersionLevel) {
-        super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel);
-    }
-
-    public static FinalizedVersionRange fromMap(Map<String, Short> versionRangeMap) {
-        return new FinalizedVersionRange(
-            BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, versionRangeMap),
-            BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, versionRangeMap));
-    }
-
-    /**
-     * Checks if the [min, max] version level range of this object does *NOT* fall within the
-     * [min, max] range of the provided SupportedVersionRange parameter.
-     *
-     * @param supportedVersionRange   the SupportedVersionRange to be checked
-     *
-     * @return                        - true, if the version levels are compatible
-     *                                - false otherwise
-     */
-    public boolean isIncompatibleWith(SupportedVersionRange supportedVersionRange) {
-        return min() < supportedVersionRange.min() || max() > supportedVersionRange.max();
-    }
-}
diff --git a/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java b/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
index 8993014a74..a864a91762 100644
--- a/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
@@ -41,4 +41,16 @@ public class SupportedVersionRange extends BaseVersionRange {
             BaseVersionRange.valueOrThrow(MIN_VERSION_KEY_LABEL, versionRangeMap),
             BaseVersionRange.valueOrThrow(MAX_VERSION_KEY_LABEL, versionRangeMap));
     }
+
+    /**
+     * Checks if the version level does *NOT* fall within the [min, max] range of this SupportedVersionRange.
+     *
+     * @param version   the version to be checked
+     *
+     * @return  - true, if the version levels are incompatible
+     *          - false otherwise
+     */
+    public boolean isIncompatibleWith(short version) {
+        return min() > version || max() < version;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 80d026330f..7c98eb2679 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.common.feature.Features;
-import org.apache.kafka.common.feature.FinalizedVersionRange;
 import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiMessageType.ListenerType;
@@ -35,6 +34,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordVersion;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -118,16 +118,15 @@ public class ApiVersionsResponse extends AbstractResponse {
             throttleTimeMs,
             apiVersions,
             Features.emptySupportedFeatures(),
-            Features.emptyFinalizedFeatures(),
-            UNKNOWN_FINALIZED_FEATURES_EPOCH
-        );
+            Collections.emptyMap(),
+            UNKNOWN_FINALIZED_FEATURES_EPOCH);
     }
 
     public static ApiVersionsResponse createApiVersionsResponse(
         int throttleTimeMs,
         RecordVersion minRecordVersion,
         Features<SupportedVersionRange> latestSupportedFeatures,
-        Features<FinalizedVersionRange> finalizedFeatures,
+        Map<String, Short> finalizedFeatures,
         long finalizedFeaturesEpoch,
         NodeApiVersions controllerApiVersions,
         ListenerType listenerType
@@ -153,7 +152,7 @@ public class ApiVersionsResponse extends AbstractResponse {
         int throttleTimeMs,
         ApiVersionCollection apiVersions,
         Features<SupportedVersionRange> latestSupportedFeatures,
-        Features<FinalizedVersionRange> finalizedFeatures,
+        Map<String, Short> finalizedFeatures,
         long finalizedFeaturesEpoch
     ) {
         return new ApiVersionsResponse(
@@ -233,7 +232,7 @@ public class ApiVersionsResponse extends AbstractResponse {
         final Errors error,
         final ApiVersionCollection apiKeys,
         final Features<SupportedVersionRange> latestSupportedFeatures,
-        final Features<FinalizedVersionRange> finalizedFeatures,
+        final Map<String, Short> finalizedFeatures,
         final long finalizedFeaturesEpoch
     ) {
         final ApiVersionsResponseData data = new ApiVersionsResponseData();
@@ -263,14 +262,14 @@ public class ApiVersionsResponse extends AbstractResponse {
     }
 
     private static FinalizedFeatureKeyCollection createFinalizedFeatureKeys(
-        Features<FinalizedVersionRange> finalizedFeatures) {
+        Map<String, Short> finalizedFeatures) {
         FinalizedFeatureKeyCollection converted = new FinalizedFeatureKeyCollection();
-        for (Map.Entry<String, FinalizedVersionRange> feature : finalizedFeatures.features().entrySet()) {
+        for (Map.Entry<String, Short> feature : finalizedFeatures.entrySet()) {
             final FinalizedFeatureKey key = new FinalizedFeatureKey();
-            final FinalizedVersionRange versionLevelRange = feature.getValue();
+            final short versionLevel = feature.getValue();
             key.setName(feature.getKey());
-            key.setMinVersionLevel(versionLevelRange.min());
-            key.setMaxVersionLevel(versionLevelRange.max());
+            key.setMinVersionLevel(versionLevel);
+            key.setMaxVersionLevel(versionLevel);
             converted.add(key);
         }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index eb4681856e..637c3d15cb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -545,7 +545,7 @@ public class KafkaAdminClientTest {
 
     private static FeatureMetadata defaultFeatureMetadata() {
         return new FeatureMetadata(
-            Utils.mkMap(Utils.mkEntry("test_feature_1", new FinalizedVersionRange((short) 2, (short) 3))),
+            Utils.mkMap(Utils.mkEntry("test_feature_1", new FinalizedVersionRange((short) 2, (short) 2))),
             Optional.of(1L),
             Utils.mkMap(Utils.mkEntry("test_feature_1", new SupportedVersionRange((short) 1, (short) 5))));
     }
@@ -563,26 +563,13 @@ public class KafkaAdminClientTest {
         return Features.supportedFeatures(featuresMap);
     }
 
-    private static Features<org.apache.kafka.common.feature.FinalizedVersionRange> convertFinalizedFeaturesMap(Map<String, FinalizedVersionRange> features) {
-        final Map<String, org.apache.kafka.common.feature.FinalizedVersionRange> featuresMap = new HashMap<>();
-        for (final Map.Entry<String, FinalizedVersionRange> entry : features.entrySet()) {
-            final FinalizedVersionRange versionRange = entry.getValue();
-            featuresMap.put(
-                entry.getKey(),
-                new org.apache.kafka.common.feature.FinalizedVersionRange(
-                    versionRange.minVersionLevel(), versionRange.maxVersionLevel()));
-        }
-
-        return Features.finalizedFeatures(featuresMap);
-    }
-
     private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures(Errors error) {
         if (error == Errors.NONE) {
             return ApiVersionsResponse.createApiVersionsResponse(
                 0,
                 ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER),
                 convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()),
-                convertFinalizedFeaturesMap(defaultFeatureMetadata().finalizedFeatures()),
+                Collections.singletonMap("test_feature_1", (short) 2),
                 defaultFeatureMetadata().finalizedFeaturesEpoch().get()
             );
         }
diff --git a/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java b/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
index 88b3471208..0b2bc4f50a 100644
--- a/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
@@ -25,7 +25,7 @@ import org.junit.jupiter.api.Test;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -36,11 +36,6 @@ public class FeaturesTest {
     public void testEmptyFeatures() {
         Map<String, Map<String, Short>> emptyMap = new HashMap<>();
 
-        Features<FinalizedVersionRange> emptyFinalizedFeatures = Features.emptyFinalizedFeatures();
-        assertTrue(emptyFinalizedFeatures.features().isEmpty());
-        assertTrue(emptyFinalizedFeatures.toMap().isEmpty());
-        assertEquals(emptyFinalizedFeatures, Features.fromFinalizedFeaturesMap(emptyMap));
-
         Features<SupportedVersionRange> emptySupportedFeatures = Features.emptySupportedFeatures();
         assertTrue(emptySupportedFeatures.features().isEmpty());
         assertTrue(emptySupportedFeatures.toMap().isEmpty());
@@ -49,9 +44,6 @@ public class FeaturesTest {
 
     @Test
     public void testNullFeatures() {
-        assertThrows(
-            NullPointerException.class,
-            () -> Features.finalizedFeatures(null));
         assertThrows(
             NullPointerException.class,
             () -> Features.supportedFeatures(null));
@@ -93,34 +85,6 @@ public class FeaturesTest {
         assertEquals(features, Features.fromSupportedFeaturesMap(expected));
     }
 
-    @Test
-    public void testFromToFinalizedFeaturesMap() {
-        FinalizedVersionRange v1 = new FinalizedVersionRange((short) 1, (short) 2);
-        FinalizedVersionRange v2 = new FinalizedVersionRange((short) 3, (short) 4);
-        Map<String, FinalizedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
-
-        Features<FinalizedVersionRange> features = Features.finalizedFeatures(allFeatures);
-
-        Map<String, Map<String, Short>> expected = mkMap(
-            mkEntry("feature_1", mkMap(mkEntry("min_version_level", (short) 1), mkEntry("max_version_level", (short) 2))),
-            mkEntry("feature_2", mkMap(mkEntry("min_version_level", (short) 3), mkEntry("max_version_level", (short) 4))));
-        assertEquals(expected, features.toMap());
-        assertEquals(features, Features.fromFinalizedFeaturesMap(expected));
-    }
-
-    @Test
-    public void testToStringFinalizedFeatures() {
-        FinalizedVersionRange v1 = new FinalizedVersionRange((short) 1, (short) 2);
-        FinalizedVersionRange v2 = new FinalizedVersionRange((short) 3, (short) 4);
-        Map<String, FinalizedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
-
-        Features<FinalizedVersionRange> features = Features.finalizedFeatures(allFeatures);
-
-        assertEquals(
-            "Features{(feature_1 -> FinalizedVersionRange[min_version_level:1, max_version_level:2]), (feature_2 -> FinalizedVersionRange[min_version_level:3, max_version_level:4])}",
-            features.toString());
-    }
-
     @Test
     public void testToStringSupportedFeatures() {
         SupportedVersionRange v1 = new SupportedVersionRange((short) 1, (short) 2);
@@ -145,29 +109,19 @@ public class FeaturesTest {
             () -> Features.fromSupportedFeaturesMap(invalidFeatures));
     }
 
-    @Test
-    public void testFinalizedFeaturesFromMapFailureWithInvalidMissingMaxVersionLevel() {
-        // This is invalid because 'max_version_level' key is missing.
-        Map<String, Map<String, Short>> invalidFeatures = mkMap(
-            mkEntry("feature_1", mkMap(mkEntry("min_version_level", (short) 1))));
-        assertThrows(
-            IllegalArgumentException.class,
-            () -> Features.fromFinalizedFeaturesMap(invalidFeatures));
-    }
-
     @Test
     public void testEquals() {
         SupportedVersionRange v1 = new SupportedVersionRange((short) 1, (short) 2);
         Map<String, SupportedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1));
         Features<SupportedVersionRange> features = Features.supportedFeatures(allFeatures);
         Features<SupportedVersionRange> featuresClone = Features.supportedFeatures(allFeatures);
-        assertTrue(features.equals(featuresClone));
+        assertEquals(features, featuresClone);
 
         SupportedVersionRange v2 = new SupportedVersionRange((short) 1, (short) 3);
         Map<String, SupportedVersionRange> allFeaturesDifferent = mkMap(mkEntry("feature_1", v2));
         Features<SupportedVersionRange> featuresDifferent = Features.supportedFeatures(allFeaturesDifferent);
-        assertFalse(features.equals(featuresDifferent));
+        assertNotEquals(features, featuresDifferent);
 
-        assertFalse(features.equals(null));
+        assertNotEquals(null, features);
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java b/clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java
deleted file mode 100644
index 989c4bd1a5..0000000000
--- a/clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.feature;
-
-import java.util.Map;
-
-import org.junit.jupiter.api.Test;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Unit tests for the FinalizedVersionRange class.
- *
- * Most of the unit tests required for BaseVersionRange are part of the SupportedVersionRangeTest
- * suite. This suite only tests behavior very specific to FinalizedVersionRange.
- */
-public class FinalizedVersionRangeTest {
-
-    @Test
-    public void testFromToMap() {
-        FinalizedVersionRange versionRange = new FinalizedVersionRange((short) 1, (short) 2);
-        assertEquals(1, versionRange.min());
-        assertEquals(2, versionRange.max());
-
-        Map<String, Short> versionRangeMap = versionRange.toMap();
-        assertEquals(
-            mkMap(
-                mkEntry("min_version_level", versionRange.min()),
-                mkEntry("max_version_level", versionRange.max())),
-            versionRangeMap);
-
-        FinalizedVersionRange newVersionRange = FinalizedVersionRange.fromMap(versionRangeMap);
-        assertEquals(1, newVersionRange.min());
-        assertEquals(2, newVersionRange.max());
-        assertEquals(versionRange, newVersionRange);
-    }
-
-    @Test
-    public void testToString() {
-        assertEquals("FinalizedVersionRange[min_version_level:1, max_version_level:1]", new FinalizedVersionRange((short) 1, (short) 1).toString());
-        assertEquals("FinalizedVersionRange[min_version_level:1, max_version_level:2]", new FinalizedVersionRange((short) 1, (short) 2).toString());
-    }
-
-    @Test
-    public void testIsCompatibleWith() {
-        assertFalse(new FinalizedVersionRange((short) 1, (short) 1).isIncompatibleWith(new SupportedVersionRange((short) 1, (short) 1)));
-        assertFalse(new FinalizedVersionRange((short) 2, (short) 3).isIncompatibleWith(new SupportedVersionRange((short) 1, (short) 4)));
-        assertFalse(new FinalizedVersionRange((short) 1, (short) 4).isIncompatibleWith(new SupportedVersionRange((short) 1, (short) 4)));
-
-        assertTrue(new FinalizedVersionRange((short) 1, (short) 4).isIncompatibleWith(new SupportedVersionRange((short) 2, (short) 3)));
-        assertTrue(new FinalizedVersionRange((short) 1, (short) 4).isIncompatibleWith(new SupportedVersionRange((short) 2, (short) 4)));
-        assertTrue(new FinalizedVersionRange((short) 2, (short) 4).isIncompatibleWith(new SupportedVersionRange((short) 2, (short) 3)));
-    }
-
-    @Test
-    public void testMinMax() {
-        FinalizedVersionRange versionRange = new FinalizedVersionRange((short) 1, (short) 2);
-        assertEquals(1, versionRange.min());
-        assertEquals(2, versionRange.max());
-    }
-}
diff --git a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
index acf452d820..a1d2af419f 100644
--- a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
@@ -25,6 +25,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -128,9 +129,9 @@ public class SupportedVersionRangeTest {
     @Test
     public void testEquals() {
         SupportedVersionRange tested = new SupportedVersionRange((short) 1, (short) 1);
-        assertTrue(tested.equals(tested));
-        assertFalse(tested.equals(new SupportedVersionRange((short) 1, (short) 2)));
-        assertFalse(tested.equals(null));
+        assertEquals(tested, tested);
+        assertNotEquals(tested, new SupportedVersionRange((short) 1, (short) 2));
+        assertNotEquals(null, tested);
     }
 
     @Test
@@ -139,4 +140,15 @@ public class SupportedVersionRangeTest {
         assertEquals(1, versionRange.min());
         assertEquals(2, versionRange.max());
     }
+
+    @Test
+    public void testIsIncompatibleWith() {
+        assertFalse(new SupportedVersionRange((short) 1, (short) 1).isIncompatibleWith((short) 1));
+        assertFalse(new SupportedVersionRange((short) 1, (short) 4).isIncompatibleWith((short) 2));
+        assertFalse(new SupportedVersionRange((short) 1, (short) 4).isIncompatibleWith((short) 1));
+        assertFalse(new SupportedVersionRange((short) 1, (short) 4).isIncompatibleWith((short) 4));
+
+        assertTrue(new SupportedVersionRange((short) 2, (short) 3).isIncompatibleWith((short) 1));
+        assertTrue(new SupportedVersionRange((short) 2, (short) 3).isIncompatibleWith((short) 4));
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index c3fa6b892e..62571c6986 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.kafka.common.requests;
 
+import java.util.Collections;
 import java.util.HashSet;
 import org.apache.kafka.common.feature.Features;
-import org.apache.kafka.common.feature.FinalizedVersionRange;
 import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.message.ApiMessageType.ListenerType;
@@ -116,7 +116,7 @@ public class ApiVersionsResponseTest {
             10,
             RecordVersion.V1,
             Features.emptySupportedFeatures(),
-            Features.emptyFinalizedFeatures(),
+            Collections.emptyMap(),
             ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
             null,
             ListenerType.ZK_BROKER
@@ -135,8 +135,7 @@ public class ApiVersionsResponseTest {
             RecordVersion.V1,
             Features.supportedFeatures(
                 Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange((short) 1, (short) 4)))),
-            Features.finalizedFeatures(
-                Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange((short) 2, (short) 3)))),
+            Utils.mkMap(Utils.mkEntry("feature", (short) 3)),
             10L,
             null,
             ListenerType.ZK_BROKER
@@ -152,7 +151,7 @@ public class ApiVersionsResponseTest {
         assertEquals(1, response.data().finalizedFeatures().size());
         FinalizedFeatureKey fKey = response.data().finalizedFeatures().find("feature");
         assertNotNull(fKey);
-        assertEquals(2, fKey.minVersionLevel());
+        assertEquals(3, fKey.minVersionLevel());
         assertEquals(3, fKey.maxVersionLevel());
         assertEquals(10, response.data().finalizedFeaturesEpoch());
     }
@@ -163,7 +162,7 @@ public class ApiVersionsResponseTest {
             AbstractResponse.DEFAULT_THROTTLE_TIME,
             RecordVersion.current(),
             Features.emptySupportedFeatures(),
-            Features.emptyFinalizedFeatures(),
+            Collections.emptyMap(),
             ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
             null,
             ListenerType.ZK_BROKER
@@ -181,7 +180,7 @@ public class ApiVersionsResponseTest {
             AbstractResponse.DEFAULT_THROTTLE_TIME,
             RecordVersion.current(),
             Features.emptySupportedFeatures(),
-            Features.emptyFinalizedFeatures(),
+            Collections.emptyMap(),
             ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
             null,
             ListenerType.ZK_BROKER
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f4a8569b80..b753cd408b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -38,7 +38,6 @@ import org.apache.kafka.common.ElectionType
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
 import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterPartitionRequestData, AlterPartitionResponseData}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -309,7 +308,7 @@ class KafkaController(val config: KafkaConfig,
    * This method enables the feature versioning system (KIP-584).
    *
    * Development in Kafka (from a high level) is organized into features. Each feature is tracked by
-   * a name and a range of version numbers. A feature can be of two types:
+   * a name and a range of version numbers or a version number. A feature can be of two types:
    *
    * 1. Supported feature:
    * A supported feature is represented by a name (string) and a range of versions (defined by a
@@ -320,8 +319,8 @@ class KafkaController(val config: KafkaConfig,
    * range of versions.
    *
    * 2. Finalized feature:
-   * A finalized feature is represented by a name (string) and a range of version levels (defined
-   * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is
+   * A finalized feature is represented by a name (string) and a specified version level (defined
+   * by a Short). Whenever the feature versioning system (KIP-584) is
    * enabled, the finalized features are stored in the cluster-wide common FeatureZNode.
    * In comparison to a supported feature, the key difference is that a finalized feature exists
    * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a
@@ -385,21 +384,24 @@ class KafkaController(val config: KafkaConfig,
   private def enableFeatureVersioning(): Unit = {
     val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
     if (version == ZkVersion.UnknownVersion) {
-      val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled,
-                                          brokerFeatures.defaultFinalizedFeatures))
+      val newVersion = createFeatureZNode(
+        FeatureZNode(config.interBrokerProtocolVersion,
+          FeatureZNodeStatus.Enabled,
+          brokerFeatures.defaultFinalizedFeatures
+        ))
       featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
     } else {
       val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
       val newFeatures = existingFeatureZNode.status match {
         case FeatureZNodeStatus.Enabled => existingFeatureZNode.features
         case FeatureZNodeStatus.Disabled =>
-          if (!existingFeatureZNode.features.empty()) {
+          if (existingFeatureZNode.features.nonEmpty) {
             warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled status" +
                  s" contains non-empty features: ${existingFeatureZNode.features}")
           }
-          Features.emptyFinalizedFeatures
+          Map.empty[String, Short]
       }
-      val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, newFeatures)
+      val newFeatureZNode = FeatureZNode(config.interBrokerProtocolVersion, FeatureZNodeStatus.Enabled, newFeatures)
       if (!newFeatureZNode.equals(existingFeatureZNode)) {
         val newVersion = updateFeatureZNode(newFeatureZNode)
         featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
@@ -423,14 +425,14 @@ class KafkaController(val config: KafkaConfig,
    *    are disabled when IBP config is < than IBP_2_7_IV0.
    */
   private def disableFeatureVersioning(): Unit = {
-    val newNode = FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures())
+    val newNode = FeatureZNode(config.interBrokerProtocolVersion, FeatureZNodeStatus.Disabled, Map.empty[String, Short])
     val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
     if (version == ZkVersion.UnknownVersion) {
       createFeatureZNode(newNode)
     } else {
       val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
       if (existingFeatureZNode.status == FeatureZNodeStatus.Disabled &&
-          !existingFeatureZNode.features.empty()) {
+          existingFeatureZNode.features.nonEmpty) {
         warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled status" +
              s" contains non-empty features: ${existingFeatureZNode.features}")
       }
@@ -1087,7 +1089,7 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
-  private def registerPartitionModificationsHandlers(topics: Seq[String]) = {
+  private def registerPartitionModificationsHandlers(topics: Seq[String]): Unit = {
     topics.foreach { topic =>
       val partitionModificationsHandler = new PartitionModificationsHandler(eventManager, topic)
       partitionModificationsHandlers.put(topic, partitionModificationsHandler)
@@ -1095,7 +1097,7 @@ class KafkaController(val config: KafkaConfig,
     partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
   }
 
-  private[controller] def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
+  private[controller] def unregisterPartitionModificationsHandlers(topics: Seq[String]): Unit = {
     topics.foreach { topic =>
       partitionModificationsHandlers.remove(topic).foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
     }
@@ -1907,16 +1909,16 @@ class KafkaController(val config: KafkaConfig,
   }
 
   /**
-   * Returns the new FinalizedVersionRange for the feature, if there are no feature
+   * Returns the new finalized version for the feature, if there are no feature
    * incompatibilities seen with all known brokers for the provided feature update.
    * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
    *
    * @param update   the feature update to be processed (this can not be meant to delete the feature)
    *
-   * @return         the new FinalizedVersionRange or error, as described above.
+   * @return         the new finalized version or error, as described above.
    */
-  private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequest.FeatureUpdateItem):
-      Either[FinalizedVersionRange, ApiError] = {
+  private def newFinalizedVersionOrIncompatibilityError(update: UpdateFeaturesRequest.FeatureUpdateItem):
+      Either[Short, ApiError] = {
     if (update.isDeleteRequest) {
       throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update")
     }
@@ -1927,28 +1929,19 @@ class KafkaController(val config: KafkaConfig,
                          "Could not apply finalized feature update because the provided feature" +
                          " is not supported."))
     } else {
-      var newVersionRange: FinalizedVersionRange = null
-      try {
-        newVersionRange = new FinalizedVersionRange(update.versionLevel(), update.versionLevel())
-      } catch {
-        case _: IllegalArgumentException => {
-          // This exception means the provided maxVersionLevel is invalid. It is handled below
-          // outside of this catch clause.
-        }
-      }
-      if (newVersionRange == null) {
+      val newVersion = update.versionLevel()
+      if (supportedVersionRange.isIncompatibleWith(newVersion)) {
         Right(new ApiError(Errors.INVALID_REQUEST,
           "Could not apply finalized feature update because the provided" +
-          s" maxVersionLevel:${update.versionLevel} is lower than the" +
+          s" versionLevel:${update.versionLevel} is lower than the" +
           s" supported minVersion:${supportedVersionRange.min}."))
       } else {
-        val newFinalizedFeature =
-          Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange)))
+        val newFinalizedFeature = Utils.mkMap(Utils.mkEntry(update.feature, newVersion)).asScala.toMap
         val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => {
           BrokerFeatures.hasIncompatibleFeatures(broker.features, newFinalizedFeature)
         })
         if (numIncompatibleBrokers == 0) {
-          Left(newVersionRange)
+          Left(newVersion)
         } else {
           Right(new ApiError(Errors.INVALID_REQUEST,
                              "Could not apply finalized feature update because" +
@@ -1959,24 +1952,24 @@ class KafkaController(val config: KafkaConfig,
   }
 
   /**
-   * Validates a feature update on an existing FinalizedVersionRange.
+   * Validates a feature update on an existing finalized version.
    * If the validation succeeds, then, the return value contains:
-   * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature.
+   * 1. the new finalized version for the feature, if the feature update was not meant to delete the feature.
    * 2. Option.empty, if the feature update was meant to delete the feature.
    *
    * If the validation fails, then returned value contains a suitable ApiError.
    *
-   * @param update                 the feature update to be processed.
-   * @param existingVersionRange   the existing FinalizedVersionRange which can be empty when no
-   *                               FinalizedVersionRange exists for the associated feature
+   * @param update             the feature update to be processed.
+   * @param existingVersion    the existing finalized version which can be empty when no
+   *                           finalized version exists for the associated feature
    *
-   * @return                       the new FinalizedVersionRange to be updated into ZK or error
-   *                               as described above.
+   * @return                   the new finalized version to be updated into ZK or error
+   *                           as described above.
    */
   private def validateFeatureUpdate(update: UpdateFeaturesRequest.FeatureUpdateItem,
-                                    existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = {
-    def newVersionRangeOrError(update: UpdateFeaturesRequest.FeatureUpdateItem): Either[Option[FinalizedVersionRange], ApiError] = {
-      newFinalizedVersionRangeOrIncompatibilityError(update)
+                                    existingVersion: Option[Short]): Either[Option[Short], ApiError] = {
+    def newVersionRangeOrError(update: UpdateFeaturesRequest.FeatureUpdateItem): Either[Option[Short], ApiError] = {
+      newFinalizedVersionOrIncompatibilityError(update)
         .fold(versionRange => Left(Some(versionRange)), error => Right(error))
     }
 
@@ -1989,7 +1982,7 @@ class KafkaController(val config: KafkaConfig,
 
       // We handle deletion requests separately from non-deletion requests.
       if (update.isDeleteRequest) {
-        if (existingVersionRange.isEmpty) {
+        if (existingVersion.isEmpty) {
           // Disallow deletion of a non-existing finalized feature.
           Right(new ApiError(Errors.INVALID_REQUEST,
                              "Can not delete non-existing finalized feature."))
@@ -1999,30 +1992,30 @@ class KafkaController(val config: KafkaConfig,
       } else if (update.versionLevel() < 1) {
         // Disallow deletion of a finalized feature without SAFE downgrade type.
         Right(new ApiError(Errors.INVALID_REQUEST,
-                           s"Can not provide maxVersionLevel: ${update.versionLevel} less" +
+                           s"Can not provide versionLevel: ${update.versionLevel} less" +
                            s" than 1 without setting the SAFE downgradeType in the request."))
       } else {
-        existingVersionRange.map(existing =>
-          if (update.versionLevel == existing.max) {
-            // Disallow a case where target maxVersionLevel matches existing maxVersionLevel.
+        existingVersion.map(existing =>
+          if (update.versionLevel == existing) {
+            // Disallow a case where target versionLevel matches existing versionLevel.
             Right(new ApiError(Errors.INVALID_REQUEST,
                                s"Can not ${if (update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) "downgrade" else "upgrade"}" +
-                               s" a finalized feature from existing maxVersionLevel:${existing.max}" +
+                               s" a finalized feature from existing versionLevel:$existing" +
                                " to the same value."))
-          } else if (update.versionLevel < existing.max && !update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) {
+          } else if (update.versionLevel < existing && !update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) {
             // Disallow downgrade of a finalized feature without the downgradeType set.
             Right(new ApiError(Errors.INVALID_REQUEST,
                                s"Can not downgrade finalized feature from existing" +
-                               s" maxVersionLevel:${existing.max} to provided" +
-                               s" maxVersionLevel:${update.versionLevel} without setting the" +
+                               s" versionLevel:$existing to provided" +
+                               s" versionLevel:${update.versionLevel} without setting the" +
                                " downgradeType to SAFE in the request."))
-          } else if (!update.upgradeType.equals(UpgradeType.UPGRADE) && update.versionLevel > existing.max) {
+          } else if (!update.upgradeType.equals(UpgradeType.UPGRADE) && update.versionLevel > existing) {
             // Disallow a request that sets downgradeType without specifying a
-            // maxVersionLevel that's lower than the existing maxVersionLevel.
+            // versionLevel that's lower than the existing versionLevel.
             Right(new ApiError(Errors.INVALID_REQUEST,
-                               s"When the downgradeType is set to SAFE set in the request, the provided" +
-                               s" maxVersionLevel:${update.versionLevel} can not be greater than" +
-                               s" existing maxVersionLevel:${existing.max}."))
+                               s"When the downgradeType is set to SAFE in the request, the provided" +
+                               s" versionLevel:${update.versionLevel} can not be greater than" +
+                               s" existing versionLevel:$existing."))
           } else {
             newVersionRangeOrError(update)
           }
@@ -2044,11 +2037,11 @@ class KafkaController(val config: KafkaConfig,
                                                         callback: UpdateFeaturesCallback): Unit = {
     val updates = request.featureUpdates
     val existingFeatures = featureCache.get
-      .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
-      .getOrElse(Map[String, FinalizedVersionRange]())
-    // A map with key being feature name and value being FinalizedVersionRange.
+      .map(featuresAndEpoch => featuresAndEpoch.features)
+      .getOrElse(Map[String, Short]())
+    // A map with key being feature name and value being finalized version.
     // This contains the target features to be eventually written to FeatureZNode.
-    val targetFeatures = scala.collection.mutable.Map[String, FinalizedVersionRange]() ++ existingFeatures
+    val targetFeatures = scala.collection.mutable.Map[String, Short]() ++ existingFeatures
     // A map with key being feature name and value being error encountered when the FeatureUpdate
     // was applied.
     val errors = scala.collection.mutable.Map[String, ApiError]()
@@ -2057,7 +2050,7 @@ class KafkaController(val config: KafkaConfig,
     //  - If a FeatureUpdate is found to be valid, then:
     //    - The corresponding entry in errors map would be updated to contain Errors.NONE.
     //    - If the FeatureUpdate is an add or update request, then the targetFeatures map is updated
-    //      to contain the new FinalizedVersionRange for the feature.
+    //      to contain the new finalized version for the feature.
     //    - Otherwise if the FeatureUpdate is a delete request, then the feature is removed from the
     //      targetFeatures map.
     //  - Otherwise if a FeatureUpdate is found to be invalid, then:
@@ -2082,7 +2075,7 @@ class KafkaController(val config: KafkaConfig,
     // of the existing finalized features in ZK.
     try {
       if (!existingFeatures.equals(targetFeatures)) {
-        val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, Features.finalizedFeatures(targetFeatures.asJava))
+        val newNode = FeatureZNode(config.interBrokerProtocolVersion, FeatureZNodeStatus.Enabled, targetFeatures)
         val newVersion = updateFeatureZNode(newNode)
         featureCache.waitUntilEpochOrThrow(newVersion, request.data().timeoutMs())
       }
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 61a59ec2ce..ccb13c75d5 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -18,12 +18,12 @@ package kafka.server
 
 import kafka.network
 import kafka.network.RequestChannel
-import org.apache.kafka.common.feature.Features
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.ApiVersionsResponse
 import org.apache.kafka.server.common.MetadataVersion
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 trait ApiVersionManager {
@@ -86,7 +86,7 @@ class DefaultApiVersionManager(
         throttleTimeMs,
         interBrokerProtocolVersion.highestSupportedRecordVersion,
         supportedFeatures,
-        finalizedFeatures.features,
+        finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
         finalizedFeatures.epoch,
         controllerApiVersions.orNull,
         listenerType)
@@ -94,7 +94,7 @@ class DefaultApiVersionManager(
         throttleTimeMs,
         interBrokerProtocolVersion.highestSupportedRecordVersion,
         supportedFeatures,
-        Features.emptyFinalizedFeatures,
+        Collections.emptyMap(),
         ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
         controllerApiVersions.orNull,
         listenerType)
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index ff7e60908c..d385f1eb07 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.utils.Logging
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
 import org.apache.kafka.server.common.MetadataVersion
 
 import java.util
@@ -42,12 +42,10 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
    * Returns the default finalized features that a new Kafka cluster with IBP config >= IBP_2_7_IV0
    * needs to be bootstrapped with.
    */
-  def defaultFinalizedFeatures: Features[FinalizedVersionRange] = {
-    Features.finalizedFeatures(
-      supportedFeatures.features.asScala.map {
-        case(name, versionRange) => (
-          name, new FinalizedVersionRange(versionRange.max, versionRange.max))
-      }.asJava)
+  def defaultFinalizedFeatures: Map[String, Short] = {
+    supportedFeatures.features.asScala.map {
+      case(name, versionRange) => (name, versionRange.max)
+    }.toMap
   }
 
   /**
@@ -65,7 +63,7 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
    * @return            The subset of input features which are incompatible. If the returned object
    *                    is empty, it means there were no feature incompatibilities found.
    */
-  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = {
+  def incompatibleFeatures(finalized: Map[String, Short]): Map[String, Short] = {
     BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, logIncompatibilities = true)
   }
 }
@@ -93,19 +91,19 @@ object BrokerFeatures extends Logging {
    *                            - False otherwise.
    */
   def hasIncompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
-                              finalizedFeatures: Features[FinalizedVersionRange]): Boolean = {
-    !incompatibleFeatures(supportedFeatures, finalizedFeatures, logIncompatibilities = false).empty
+                              finalizedFeatures: Map[String, Short]): Boolean = {
+    incompatibleFeatures(supportedFeatures, finalizedFeatures, logIncompatibilities = false).nonEmpty
   }
 
   private def incompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
-                                   finalizedFeatures: Features[FinalizedVersionRange],
-                                   logIncompatibilities: Boolean): Features[FinalizedVersionRange] = {
-    val incompatibleFeaturesInfo = finalizedFeatures.features.asScala.map {
+                                   finalizedFeatures: Map[String, Short],
+                                   logIncompatibilities: Boolean): Map[String, Short] = {
+    val incompatibleFeaturesInfo = finalizedFeatures.map {
       case (feature, versionLevels) =>
         val supportedVersions = supportedFeatures.get(feature)
         if (supportedVersions == null) {
           (feature, versionLevels, "{feature=%s, reason='Unsupported feature'}".format(feature))
-        } else if (versionLevels.isIncompatibleWith(supportedVersions)) {
+        } else if (supportedVersions.isIncompatibleWith(versionLevels)) {
           (feature, versionLevels, "{feature=%s, reason='%s is incompatible with %s'}".format(
             feature, versionLevels, supportedVersions))
         } else {
@@ -117,7 +115,6 @@ object BrokerFeatures extends Logging {
       warn("Feature incompatibilities seen: " +
            incompatibleFeaturesInfo.map { case(_, _, errorReason) => errorReason }.mkString(", "))
     }
-    Features.finalizedFeatures(
-      incompatibleFeaturesInfo.map { case(feature, versionLevels, _) => (feature, versionLevels) }.toMap.asJava)
+    incompatibleFeaturesInfo.map { case(feature, versionLevels, _) => (feature, versionLevels) }.toMap
   }
 }
diff --git a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
index 390110dba0..d414e7671a 100644
--- a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
+++ b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
@@ -17,23 +17,22 @@
 
 package kafka.server
 
-import java.util
-import java.util.Collections
 import kafka.utils.Logging
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
 import org.apache.kafka.image.FeaturesDelta
 import org.apache.kafka.server.common.MetadataVersion
 
+import java.util
+import scala.compat.java8.OptionConverters._
 import scala.concurrent.TimeoutException
+import scala.jdk.CollectionConverters._
 import scala.math.max
-import scala.compat.java8.OptionConverters._
 
 // Raised whenever there was an error in updating the FinalizedFeatureCache with features.
 class FeatureCacheUpdateException(message: String) extends RuntimeException(message) {
 }
 
 // Helper class that represents finalized features along with an epoch value.
-case class FinalizedFeaturesAndEpoch(features: Features[FinalizedVersionRange], epoch: Long) {
+case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long) {
   override def toString(): String = {
     s"FinalizedFeaturesAndEpoch(features=$features, epoch=$epoch)"
   }
@@ -107,7 +106,7 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
    *                         supported features. In such a case, the existing cache contents are
    *                         not modified.
    */
-  def updateOrThrow(latestFeatures: Features[FinalizedVersionRange], latestEpoch: Long): Unit = {
+  def updateOrThrow(latestFeatures: Map[String, Short], latestEpoch: Long): Unit = {
     val latest = FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)
     val existing = featuresAndEpoch.map(item => item.toString()).getOrElse("<empty>")
     if (featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch > latest.epoch) {
@@ -116,7 +115,7 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
       throw new FeatureCacheUpdateException(errorMsg)
     } else {
       val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features)
-      if (!incompatibleFeatures.empty) {
+      if (incompatibleFeatures.nonEmpty) {
         val errorMsg = "FinalizedFeatureCache update failed since feature compatibility" +
           s" checks failed! Supported ${brokerFeatures.supportedFeatures} has incompatibilities" +
           s" with the latest $latest."
@@ -134,21 +133,19 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
 
   def update(featuresDelta: FeaturesDelta, highestMetadataOffset: Long): Unit = {
     val features = featuresAndEpoch.getOrElse(
-      FinalizedFeaturesAndEpoch(Features.emptyFinalizedFeatures(), -1))
-    val newFeatures = new util.HashMap[String, FinalizedVersionRange]()
-    newFeatures.putAll(features.features.features())
+      FinalizedFeaturesAndEpoch(Map.empty, -1))
+    val newFeatures = new util.HashMap[String, Short]()
+    newFeatures.putAll(features.features.asJava)
     featuresDelta.changes().entrySet().forEach { e =>
       e.getValue.asScala match {
         case None => newFeatures.remove(e.getKey)
-        case Some(version) => newFeatures.put(e.getKey,
-          new FinalizedVersionRange(version, version))
+        case Some(version) => newFeatures.put(e.getKey, version)
       }
     }
     featuresDelta.metadataVersionChange().ifPresent { metadataVersion =>
-      newFeatures.put(MetadataVersion.FEATURE_NAME, new FinalizedVersionRange(metadataVersion.featureLevel(), metadataVersion.featureLevel()))
+      newFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel())
     }
-    featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(Features.finalizedFeatures(
-      Collections.unmodifiableMap(newFeatures)), highestMetadataOffset))
+    featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(newFeatures.asScala.toMap, highestMetadataOffset))
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
index 8f10ab661a..04f138ba7b 100644
--- a/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
+++ b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
@@ -105,7 +105,7 @@ class FinalizedFeatureChangeListener(private val finalizedFeatureCache: Finalize
               finalizedFeatureCache.clear()
             }
             case FeatureZNodeStatus.Enabled => {
-              finalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+              finalizedFeatureCache.updateOrThrow(featureZNode.features.toMap, version)
             }
             case _ => throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
           }
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 9733e9c83d..7006a21f94 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -33,7 +33,7 @@ import kafka.utils.Json
 import kafka.utils.json.JsonObject
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.feature.Features._
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -880,20 +880,37 @@ object FeatureZNodeStatus {
 /**
  * Represents the contents of the ZK node containing finalized feature information.
  *
+ * @param version    the version of ZK node, we removed min_version_level in version 2
  * @param status     the status of the ZK node
  * @param features   the cluster-wide finalized features
  */
-case class FeatureZNode(status: FeatureZNodeStatus, features: Features[FinalizedVersionRange]) {
+case class FeatureZNode(version: Int, status: FeatureZNodeStatus, features: Map[String, Short]) {
 }
 
 object FeatureZNode {
   private val VersionKey = "version"
   private val StatusKey = "status"
   private val FeaturesKey = "features"
+  private val V1MinVersionKey = "min_version_level"
+  private val V1MaxVersionKey = "max_version_level"
 
   // V1 contains 'version', 'status' and 'features' keys.
   val V1 = 1
-  val CurrentVersion = V1
+  // V2 removes min_version_level
+  val V2 = 2
+
+  /**
+   * - Create a feature info with v1 json format if if the metadataVersion is before 3.2.0
+   * - Create a feature znode with v2 json format if the metadataVersion is 3.2.1 or above.
+   */
+  def apply(metadataVersion: MetadataVersion, status: FeatureZNodeStatus, features: Map[String, Short]): FeatureZNode = {
+    val version = if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
+      V2
+    } else {
+      V1
+    }
+    FeatureZNode(version, status, features)
+  }
 
   def path = "/feature"
 
@@ -914,10 +931,19 @@ object FeatureZNode {
    * @return               JSON representation of the FeatureZNode, as an Array[Byte]
    */
   def encode(featureZNode: FeatureZNode): Array[Byte] = {
+    val features = if (featureZNode.version == V1) {
+      asJavaMap(featureZNode.features.map{
+        case (feature, version) => feature -> Map(V1MaxVersionKey -> version, V1MinVersionKey -> version)
+      })
+    } else {
+      asJavaMap(featureZNode.features.map{
+        case (feature, version) => feature -> Map(V1MaxVersionKey -> version)
+      })
+    }
     val jsonMap = collection.mutable.Map(
-      VersionKey -> CurrentVersion,
+      VersionKey -> featureZNode.version,
       StatusKey -> featureZNode.status.id,
-      FeaturesKey -> featureZNode.features.toMap)
+      FeaturesKey -> features)
     Json.encodeAsBytes(jsonMap.asJava)
   }
 
@@ -935,27 +961,11 @@ object FeatureZNode {
       case Right(js) =>
         val featureInfo = js.asJsonObject
         val version = featureInfo(VersionKey).to[Int]
-        if (version < V1) {
+        if (version < V1 || version > V2) {
           throw new IllegalArgumentException(s"Unsupported version: $version of feature information: " +
             s"${new String(jsonBytes, UTF_8)}")
         }
 
-        val featuresMap = featureInfo
-          .get(FeaturesKey)
-          .flatMap(_.to[Option[Map[String, Map[String, Int]]]])
-
-        if (featuresMap.isEmpty) {
-          throw new IllegalArgumentException("Features map can not be absent in: " +
-            s"${new String(jsonBytes, UTF_8)}")
-        }
-        val features = asJavaMap(
-          featuresMap
-            .map(theMap => theMap.map {
-              case (featureName, versionInfo) => featureName -> versionInfo.map {
-                case (label, version) => label -> version.asInstanceOf[Short]
-              }
-            }).getOrElse(Map[String, Map[String, Short]]()))
-
         val statusInt = featureInfo
           .get(StatusKey)
           .flatMap(_.to[Option[Int]])
@@ -969,19 +979,44 @@ object FeatureZNode {
             s"Malformed status: $statusInt found in feature information: ${new String(jsonBytes, UTF_8)}")
         }
 
-        var finalizedFeatures: Features[FinalizedVersionRange] = null
-        try {
-          finalizedFeatures = fromFinalizedFeaturesMap(features)
-        } catch {
-          case e: Exception => throw new IllegalArgumentException(
-            "Unable to convert to finalized features from map: " + features, e)
-        }
-        FeatureZNode(status.get, finalizedFeatures)
+        val finalizedFeatures = decodeFeature(version, featureInfo, jsonBytes)
+        FeatureZNode(version, status.get, finalizedFeatures)
       case Left(e) =>
         throw new IllegalArgumentException(s"Failed to parse feature information: " +
           s"${new String(jsonBytes, UTF_8)}", e)
     }
   }
+
+  private def decodeFeature(version: Int, featureInfo: JsonObject, jsonBytes: Array[Byte]): Map[String, Short] = {
+    val featuresMap = featureInfo
+      .get(FeaturesKey)
+      .flatMap(_.to[Option[Map[String, Map[String, Int]]]])
+
+    if (featuresMap.isEmpty) {
+      throw new IllegalArgumentException("Features map can not be absent in: " +
+        s"${new String(jsonBytes, UTF_8)}")
+    }
+    featuresMap.get.map {
+      case (featureName, versionInfo) =>
+        if (version == V1 && !versionInfo.contains(V1MinVersionKey)) {
+          throw new IllegalArgumentException(s"$V1MinVersionKey absent in [$versionInfo]")
+        }
+        if (!versionInfo.contains(V1MaxVersionKey)) {
+          throw new IllegalArgumentException(s"$V1MaxVersionKey absent in [$versionInfo]")
+        }
+
+        val minValueOpt = versionInfo.get(V1MinVersionKey)
+        val maxValue = versionInfo(V1MaxVersionKey)
+
+        if (version == V1 && (minValueOpt.get < 1 || maxValue < minValueOpt.get)) {
+          throw new IllegalArgumentException(s"Expected minValue >= 1, maxValue >= 1 and maxValue >= minValue, but received minValue: ${minValueOpt.get}, maxValue: $maxValue")
+        }
+        if (maxValue < 1) {
+          throw new IllegalArgumentException(s"Expected maxValue >= 1, but received maxValue: $maxValue")
+        }
+        featureName -> maxValue.toShort
+    }
+  }
 }
 
 object ZkData {
diff --git a/core/src/test/scala/kafka/zk/FeatureZNodeTest.scala b/core/src/test/scala/kafka/zk/FeatureZNodeTest.scala
index 9344724ff1..b7778c1d5f 100644
--- a/core/src/test/scala/kafka/zk/FeatureZNodeTest.scala
+++ b/core/src/test/scala/kafka/zk/FeatureZNodeTest.scala
@@ -17,31 +17,40 @@
 
 package kafka.zk
 
-import java.nio.charset.StandardCharsets
 
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
-import org.apache.kafka.common.feature.Features._
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.apache.kafka.server.common.MetadataVersion.{IBP_3_2_IV0, IBP_3_3_IV0}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertThrows}
 import org.junit.jupiter.api.Test
 
-import scala.jdk.CollectionConverters._
+import java.nio.charset.StandardCharsets
 
 class FeatureZNodeTest {
 
   @Test
   def testEncodeDecode(): Unit = {
-    val featureZNode = FeatureZNode(
+    val featureZNodeV1 = FeatureZNode(
+      IBP_3_2_IV0,
+      FeatureZNodeStatus.Enabled,
+
+      Map[String, Short](
+        "feature1" -> 2,
+        "feature2" -> 4))
+    val decodedV1 = FeatureZNode.decode(FeatureZNode.encode(featureZNodeV1))
+    assertEquals(featureZNodeV1, decodedV1)
+
+    val featureZNodeV2 = FeatureZNode(
+      IBP_3_3_IV0,
       FeatureZNodeStatus.Enabled,
-      Features.finalizedFeatures(
-        Map[String, FinalizedVersionRange](
-          "feature1" -> new FinalizedVersionRange(1, 2),
-          "feature2" -> new FinalizedVersionRange(2, 4)).asJava))
-    val decoded = FeatureZNode.decode(FeatureZNode.encode(featureZNode))
-    assertEquals(featureZNode, decoded)
+
+      Map[String, Short](
+        "feature1" -> 2,
+        "feature2" -> 4))
+    val decodedV2 = FeatureZNode.decode(FeatureZNode.encode(featureZNodeV2))
+    assertEquals(featureZNodeV2, decodedV2)
   }
 
   @Test
-  def testDecodeSuccess(): Unit = {
+  def testDecodeSuccessV1(): Unit = {
     val featureZNodeStrTemplate = """{
       "version":1,
       "status":1,
@@ -52,15 +61,36 @@ class FeatureZNodeTest {
     val node1 = FeatureZNode.decode(featureZNodeStrTemplate.format(validFeatures).getBytes(StandardCharsets.UTF_8))
     assertEquals(FeatureZNodeStatus.Enabled, node1.status)
     assertEquals(
-      Features.finalizedFeatures(
-        Map[String, FinalizedVersionRange](
-          "feature1" -> new FinalizedVersionRange(1, 2),
-          "feature2" -> new FinalizedVersionRange(2, 4)).asJava), node1.features)
+      Map[String, Short](
+        "feature1" -> 2,
+        "feature2" -> 4), node1.features)
+
+    val emptyFeatures = "{}"
+    val node2 = FeatureZNode.decode(featureZNodeStrTemplate.format(emptyFeatures).getBytes(StandardCharsets.UTF_8))
+    assertEquals(FeatureZNodeStatus.Enabled, node2.status)
+    assertEquals(Map.empty, node2.features)
+  }
+
+  @Test
+  def testDecodeSuccessV2(): Unit = {
+    val featureZNodeStrTemplate = """{
+      "version":2,
+      "status":1,
+      "features":%s
+    }"""
+
+    val validFeatures = """{"feature1": {"max_version_level": 2}, "feature2": {"max_version_level": 4}}"""
+    val node1 = FeatureZNode.decode(featureZNodeStrTemplate.format(validFeatures).getBytes(StandardCharsets.UTF_8))
+    assertEquals(FeatureZNodeStatus.Enabled, node1.status)
+    assertEquals(
+      Map[String, Short](
+        "feature1" -> 2,
+        "feature2" -> 4), node1.features)
 
     val emptyFeatures = "{}"
     val node2 = FeatureZNode.decode(featureZNodeStrTemplate.format(emptyFeatures).getBytes(StandardCharsets.UTF_8))
     assertEquals(FeatureZNodeStatus.Enabled, node2.status)
-    assertEquals(emptyFinalizedFeatures, node2.features)
+    assertEquals(Map.empty, node2.features)
   }
 
   @Test
@@ -73,11 +103,11 @@ class FeatureZNodeTest {
     }"""
     assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(FeatureZNode.V1 - 1, 1).getBytes(StandardCharsets.UTF_8)))
     val invalidStatus = FeatureZNodeStatus.Enabled.id + 1
-    assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(FeatureZNode.CurrentVersion, invalidStatus).getBytes(StandardCharsets.UTF_8)))
+    assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(FeatureZNode.V2, invalidStatus).getBytes(StandardCharsets.UTF_8)))
   }
 
   @Test
-  def testDecodeFailOnInvalidFeatures(): Unit = {
+  def testDecodeFailOnInvalidFeaturesV1(): Unit = {
     val featureZNodeStrTemplate =
       """{
       "version":1,
@@ -99,4 +129,29 @@ class FeatureZNodeTest {
     val invalidFeaturesMissingMinVersionLevel = ""","features":{"feature1": {"max_version_level": 1}}"""
     assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(invalidFeaturesMissingMinVersionLevel).getBytes(StandardCharsets.UTF_8)))
   }
+
+  @Test
+  def testDecodeFailOnInvalidFeaturesV2(): Unit = {
+    val featureZNodeStrTemplate =
+      """{
+      "version":2,
+      "status":1%s
+    }"""
+
+    val missingFeatures = ""
+    assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(missingFeatures).getBytes(StandardCharsets.UTF_8)))
+
+    val malformedFeatures = ""","features":{"feature1": {"min_version_level": 1, "max_version_level": 2}, "partial"}"""
+    assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(malformedFeatures).getBytes(StandardCharsets.UTF_8)))
+
+    // We only inspect these configs in v1
+    val invalidFeaturesMinVersionLevel = ""","features":{"feature1": {"min_version_level": 0, "max_version_level": 2}}"""
+    assertDoesNotThrow(() => FeatureZNode.decode(featureZNodeStrTemplate.format(invalidFeaturesMinVersionLevel).getBytes(StandardCharsets.UTF_8)))
+
+    val invalidFeaturesMaxVersionLevel = ""","features":{"feature1": {"min_version_level": 2, "max_version_level": 1}}"""
+    assertDoesNotThrow(() => FeatureZNode.decode(featureZNodeStrTemplate.format(invalidFeaturesMaxVersionLevel).getBytes(StandardCharsets.UTF_8)))
+
+    val invalidFeaturesMissingMinVersionLevel = ""","features":{"feature1": {"max_version_level": 1}}"""
+    assertDoesNotThrow(() => FeatureZNode.decode(featureZNodeStrTemplate.format(invalidFeaturesMissingMinVersionLevel).getBytes(StandardCharsets.UTF_8)))
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index d53fc763eb..d8d90ede09 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -19,7 +19,6 @@ package kafka.controller
 
 import java.util.Properties
 import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit}
-
 import com.yammer.metrics.core.Timer
 import kafka.api.LeaderAndIsr
 import kafka.controller.KafkaController.AlterPartitionCallback
@@ -27,13 +26,12 @@ import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
 import kafka.utils.{LogCaptureAppender, TestUtils}
 import kafka.zk.{FeatureZNodeStatus, _}
 import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException}
-import org.apache.kafka.common.feature.Features
 import org.apache.kafka.common.metrics.KafkaMetric
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.{IBP_2_6_IV0, IBP_2_7_IV0}
+import org.apache.kafka.server.common.MetadataVersion.{IBP_2_6_IV0, IBP_2_7_IV0, IBP_3_2_IV0}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.log4j.Level
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
@@ -637,12 +635,12 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 
   @Test
   def testControllerFeatureZNodeSetupWhenFeatureVersioningIsEnabledWithDisabledExistingFeatureZNode(): Unit = {
-    testControllerFeatureZNodeSetup(Some(new FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures())), IBP_2_7_IV0)
+    testControllerFeatureZNodeSetup(Some(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, Map.empty[String, Short])), IBP_2_7_IV0)
   }
 
   @Test
   def testControllerFeatureZNodeSetupWhenFeatureVersioningIsEnabledWithEnabledExistingFeatureZNode(): Unit = {
-    testControllerFeatureZNodeSetup(Some(new FeatureZNode(FeatureZNodeStatus.Enabled, Features.emptyFinalizedFeatures())), IBP_2_7_IV0)
+    testControllerFeatureZNodeSetup(Some(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, Map.empty[String, Short])), IBP_2_7_IV0)
   }
 
   @Test
@@ -652,12 +650,12 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 
   @Test
   def testControllerFeatureZNodeSetupWhenFeatureVersioningIsDisabledWithDisabledExistingFeatureZNode(): Unit = {
-    testControllerFeatureZNodeSetup(Some(new FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures())), IBP_2_6_IV0)
+    testControllerFeatureZNodeSetup(Some(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, Map.empty[String, Short])), IBP_2_6_IV0)
   }
 
   @Test
   def testControllerFeatureZNodeSetupWhenFeatureVersioningIsDisabledWithEnabledExistingFeatureZNode(): Unit = {
-    testControllerFeatureZNodeSetup(Some(new FeatureZNode(FeatureZNodeStatus.Enabled, Features.emptyFinalizedFeatures())), IBP_2_6_IV0)
+    testControllerFeatureZNodeSetup(Some(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, Map.empty[String, Short])), IBP_2_6_IV0)
   }
 
   @Test
@@ -812,7 +810,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     val (mayBeFeatureZNodeBytes, versionAfter) = zkClient.getDataAndVersion(FeatureZNode.path)
     val newZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
     if (interBrokerProtocolVersion.isAtLeast(IBP_2_7_IV0)) {
-      val emptyZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, Features.emptyFinalizedFeatures)
+      val emptyZNode = FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, Map.empty[String, Short])
       initialZNode match {
         case Some(node) => {
           node.status match {
@@ -826,10 +824,10 @@ class ControllerIntegrationTest extends QuorumTestHarness {
         }
         case None =>
           assertEquals(0, versionAfter)
-          assertEquals(new FeatureZNode(FeatureZNodeStatus.Enabled, Features.emptyFinalizedFeatures), newZNode)
+          assertEquals(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, Map.empty[String, Short]), newZNode)
       }
     } else {
-      val emptyZNode = new FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures)
+      val emptyZNode = FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, Map.empty[String, Short])
       initialZNode match {
         case Some(node) => {
           node.status match {
@@ -843,7 +841,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
         }
         case None =>
           assertEquals(0, versionAfter)
-          assertEquals(new FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures), newZNode)
+          assertEquals(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, Map.empty[String, Short]), newZNode)
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
index eab3928483..ad8786c919 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
@@ -17,9 +17,10 @@
 
 package kafka.server
 
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{Disabled, Test}
+import org.junit.jupiter.api.Test
 
 import scala.jdk.CollectionConverters._
 
@@ -38,15 +39,12 @@ class BrokerFeaturesTest {
       "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
     brokerFeatures.setSupportedFeatures(supportedFeatures)
 
-    val compatibleFeatures = Map[String, FinalizedVersionRange](
-      "test_feature_1" -> new FinalizedVersionRange(2, 3))
-    val inCompatibleFeatures = Map[String, FinalizedVersionRange](
-      "test_feature_3" -> new FinalizedVersionRange(3, 4))
+    val compatibleFeatures = Map[String, Short]("test_feature_1" -> 4)
+    val inCompatibleFeatures = Map[String, Short]("test_feature_3" -> 4)
     val features = compatibleFeatures++inCompatibleFeatures
-    val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+    val finalizedFeatures = features
 
-    assertEquals(
-      Features.finalizedFeatures(inCompatibleFeatures.asJava),
+    assertEquals(inCompatibleFeatures,
       brokerFeatures.incompatibleFeatures(finalizedFeatures))
     assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures))
   }
@@ -59,15 +57,13 @@ class BrokerFeaturesTest {
       "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
     brokerFeatures.setSupportedFeatures(supportedFeatures)
 
-    val compatibleFeatures = Map[String, FinalizedVersionRange](
-      "test_feature_1" -> new FinalizedVersionRange(2, 3))
-    val inCompatibleFeatures = Map[String, FinalizedVersionRange](
-      "test_feature_2" -> new FinalizedVersionRange(1, 4))
+    val compatibleFeatures = Map[String, Short]("test_feature_1" -> 3)
+    val inCompatibleFeatures = Map[String, Short]("test_feature_2" -> 4)
     val features = compatibleFeatures++inCompatibleFeatures
-    val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+    val finalizedFeatures = features
 
     assertEquals(
-      Features.finalizedFeatures(inCompatibleFeatures.asJava),
+      inCompatibleFeatures,
       brokerFeatures.incompatibleFeatures(finalizedFeatures))
     assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures))
   }
@@ -80,16 +76,15 @@ class BrokerFeaturesTest {
       "test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
     brokerFeatures.setSupportedFeatures(supportedFeatures)
 
-    val compatibleFeatures = Map[String, FinalizedVersionRange](
-      "test_feature_1" -> new FinalizedVersionRange(2, 3),
-      "test_feature_2" -> new FinalizedVersionRange(1, 3))
-    val finalizedFeatures = Features.finalizedFeatures(compatibleFeatures.asJava)
-    assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).empty())
+    val compatibleFeatures = Map[String, Short](
+      "test_feature_1" -> 3,
+      "test_feature_2" -> 3)
+    val finalizedFeatures = compatibleFeatures
+    assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).isEmpty)
     assertFalse(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures))
   }
 
   @Test
-  @Disabled("Need to remove or rewrite this test after we fully remove FinalizedVersionRange")
   def testDefaultFinalizedFeatures(): Unit = {
     val brokerFeatures = BrokerFeatures.createDefault()
     val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
@@ -98,10 +93,11 @@ class BrokerFeaturesTest {
       "test_feature_3" -> new SupportedVersionRange(3, 7)).asJava)
     brokerFeatures.setSupportedFeatures(supportedFeatures)
 
-    val expectedFeatures = Map[String, FinalizedVersionRange](
-      "test_feature_1" -> new FinalizedVersionRange(1, 4),
-      "test_feature_2" -> new FinalizedVersionRange(1, 3),
-      "test_feature_3" -> new FinalizedVersionRange(3, 7))
-    assertEquals(Features.finalizedFeatures(expectedFeatures.asJava), brokerFeatures.defaultFinalizedFeatures)
+    val expectedFeatures = Map[String, Short](
+      MetadataVersion.FEATURE_NAME -> MetadataVersion.latest().featureLevel(),
+      "test_feature_1" -> 4,
+      "test_feature_2" -> 3,
+      "test_feature_3" -> 7)
+    assertEquals(expectedFeatures, brokerFeatures.defaultFinalizedFeatures)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
index d0f4c0ab05..2836fbf8da 100644
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
 import org.junit.jupiter.api.Test
 
@@ -37,9 +37,7 @@ class FinalizedFeatureCacheTest {
     val brokerFeatures = BrokerFeatures.createDefault()
     brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
 
-    val features = Map[String, FinalizedVersionRange](
-      "feature_1" -> new FinalizedVersionRange(1, 4))
-    val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+    val finalizedFeatures = Map[String, Short]("feature_1" -> 4)
 
     val cache = new FinalizedFeatureCache(brokerFeatures)
     cache.updateOrThrow(finalizedFeatures, 10)
@@ -62,9 +60,7 @@ class FinalizedFeatureCacheTest {
     val brokerFeatures = BrokerFeatures.createDefault()
     brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
 
-    val features = Map[String, FinalizedVersionRange](
-      "feature_1" -> new FinalizedVersionRange(1, 2))
-    val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+    val finalizedFeatures = Map[String, Short]("feature_1" -> 2)
 
     val cache = new FinalizedFeatureCache(brokerFeatures)
     assertThrows(classOf[FeatureCacheUpdateException], () => cache.updateOrThrow(finalizedFeatures, 12))
@@ -80,9 +76,7 @@ class FinalizedFeatureCacheTest {
     val brokerFeatures = BrokerFeatures.createDefault()
     brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
 
-    val features = Map[String, FinalizedVersionRange](
-      "feature_1" -> new FinalizedVersionRange(2, 3))
-    val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+    val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
 
     val cache = new FinalizedFeatureCache(brokerFeatures)
     cache.updateOrThrow(finalizedFeatures, 12)
@@ -98,9 +92,7 @@ class FinalizedFeatureCacheTest {
     val brokerFeatures = BrokerFeatures.createDefault()
     brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
 
-    val features = Map[String, FinalizedVersionRange](
-      "feature_1" -> new FinalizedVersionRange(2, 3))
-    val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+    val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
 
     val cache = new FinalizedFeatureCache(brokerFeatures)
     cache.updateOrThrow(finalizedFeatures, 12)
diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
index d59474efd4..67986d1683 100644
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
@@ -17,17 +17,16 @@
 
 package kafka.server
 
-import java.util.concurrent.{CountDownLatch, TimeoutException}
-
-import kafka.server.QuorumTestHarness
-import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
 import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
 import org.apache.kafka.common.utils.Exit
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0
 import org.apache.kafka.test.{TestUtils => JTestUtils}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
+import java.util.concurrent.{CountDownLatch, TimeoutException}
 import scala.jdk.CollectionConverters._
 
 class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
@@ -42,14 +41,12 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
   }
 
   private def createFinalizedFeatures(): FinalizedFeaturesAndEpoch = {
-    val finalizedFeaturesMap = Map[String, FinalizedVersionRange](
-      "feature_1" -> new FinalizedVersionRange(2, 3))
-    val finalizedFeatures = Features.finalizedFeatures(finalizedFeaturesMap.asJava)
-    zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, finalizedFeatures))
+    val finalizedFeaturesMap = Map[String, Short]("feature_1" -> 3)
+    zkClient.createFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, finalizedFeaturesMap))
     val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
     assertNotEquals(version, ZkVersion.UnknownVersion)
     assertFalse(mayBeFeatureZNodeBytes.isEmpty)
-    FinalizedFeaturesAndEpoch(finalizedFeatures, version)
+    FinalizedFeaturesAndEpoch(finalizedFeaturesMap, version)
   }
 
   private def createListener(
@@ -87,8 +84,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
     val cache = new FinalizedFeatureCache(brokerFeatures)
     val listener = createListener(cache, Some(initialFinalizedFeatures))
 
-    def updateAndCheckCache(finalizedFeatures: Features[FinalizedVersionRange]): Unit = {
-      zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, finalizedFeatures))
+    def updateAndCheckCache(finalizedFeatures: Map[String, Short]): Unit = {
+      zkClient.updateFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, finalizedFeatures))
       val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
       assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
       assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
@@ -101,18 +98,14 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
 
     // Check if the write succeeds and a ZK notification is received that causes the feature cache
     // to be populated.
-    updateAndCheckCache(
-      Features.finalizedFeatures(
-        Map[String, FinalizedVersionRange](
-        "feature_1" -> new FinalizedVersionRange(2, 4)).asJava))
+    updateAndCheckCache(Map[String, Short]("feature_1" -> 4))
     // Check if second write succeeds and a ZK notification is again received that causes the cache
     // to be populated. This check is needed to verify that the watch on the FeatureZNode was
     // re-established after the notification was received due to the first write above.
     updateAndCheckCache(
-      Features.finalizedFeatures(
-        Map[String, FinalizedVersionRange](
-          "feature_1" -> new FinalizedVersionRange(2, 4),
-          "feature_2" -> new FinalizedVersionRange(1, 3)).asJava))
+      Map[String, Short](
+        "feature_1" -> 4,
+        "feature_2" -> 3))
   }
 
   /**
@@ -146,9 +139,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
     val cache = new FinalizedFeatureCache(brokerFeatures)
     val initialFinalizedFeatures = createFinalizedFeatures()
 
-    val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange]()
-    val updatedFinalizedFeatures = Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava)
-    zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Disabled, updatedFinalizedFeatures))
+    val updatedFinalizedFeaturesMap = Map[String, Short]()
+    zkClient.updateFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, updatedFinalizedFeaturesMap))
     val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
     assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
     assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
@@ -169,9 +161,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
 
     assertThrows(classOf[TimeoutException], () => cache.waitUntilEpochOrThrow(initialFinalizedFeatures.epoch + 1, JTestUtils.DEFAULT_MAX_WAIT_MS))
 
-    val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange]()
-    val updatedFinalizedFeatures = Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava)
-    zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Disabled, updatedFinalizedFeatures))
+    val updatedFinalizedFeaturesMap = Map[String, Short]()
+    zkClient.updateFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, updatedFinalizedFeaturesMap))
     val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
     assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
     assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
@@ -191,10 +182,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
     val brokerFeatures = createBrokerFeatures()
     val cache = new FinalizedFeatureCache(brokerFeatures)
 
-    val incompatibleFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
-      "feature_1" -> new FinalizedVersionRange(2, 5))
-    val incompatibleFinalizedFeatures = Features.finalizedFeatures(incompatibleFinalizedFeaturesMap.asJava)
-    zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, incompatibleFinalizedFeatures))
+    val incompatibleFinalizedFeaturesMap = Map[String, Short]("feature_1" -> 5)
+    zkClient.createFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, incompatibleFinalizedFeaturesMap))
     val (mayBeFeatureZNodeBytes, initialVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
     assertNotEquals(initialVersion, ZkVersion.UnknownVersion)
     assertFalse(mayBeFeatureZNodeBytes.isEmpty)
@@ -240,12 +229,9 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
 
     val exitLatch = new CountDownLatch(1)
     Exit.setExitProcedure((_, _) => exitLatch.countDown())
-    val incompatibleFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
-      "feature_1" -> new FinalizedVersionRange(
-        brokerFeatures.supportedFeatures.get("feature_1").min(),
-        (brokerFeatures.supportedFeatures.get("feature_1").max() + 1).asInstanceOf[Short]))
-    val incompatibleFinalizedFeatures = Features.finalizedFeatures(incompatibleFinalizedFeaturesMap.asJava)
-    zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, incompatibleFinalizedFeatures))
+    val incompatibleFinalizedFeaturesMap = Map[String, Short](
+      "feature_1" -> (brokerFeatures.supportedFeatures.get("feature_1").max() + 1).asInstanceOf[Short])
+    zkClient.updateFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, incompatibleFinalizedFeaturesMap))
     val (mayBeFeatureZNodeIncompatibleBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
     assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
     assertFalse(mayBeFeatureZNodeIncompatibleBytes.isEmpty)
diff --git a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
index a84082b55e..9f63aa73e1 100644
--- a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
@@ -19,20 +19,18 @@ package kafka.server
 
 import java.util.{Optional, Properties}
 import java.util.concurrent.ExecutionException
-
 import kafka.utils.TestUtils
 import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
 import kafka.utils.TestUtils.waitUntilTrue
 import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
 import org.apache.kafka.common.errors.InvalidRequestException
-import org.apache.kafka.common.feature.FinalizedVersionRange
 import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
 import org.apache.kafka.common.message.UpdateFeaturesRequestData
 import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{UpdateFeaturesRequest, UpdateFeaturesResponse}
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV0
+import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV0, IBP_3_2_IV0}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertThrows, assertTrue}
 
@@ -52,8 +50,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
     Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3))))
   }
 
-  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
-    Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2))))
+  private def defaultFinalizedFeatures(): Map[String, Short] = {
+    Utils.mkMap(Utils.mkEntry("feature_1", 2.toShort)).asScala.toMap
   }
 
   private def updateSupportedFeatures(
@@ -84,9 +82,9 @@ class UpdateFeaturesTest extends BaseRequestTest {
     updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
   }
 
-  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): Int = {
+  private def updateFeatureZNode(features: Map[String, Short]): Int = {
     val server = serverForId(0).get
-    val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+    val newNode = FeatureZNode(IBP_2_7_IV0, FeatureZNodeStatus.Enabled, features)
     val newVersion = server.zkClient.updateFeatureZNode(newNode)
     servers.foreach(s => {
       s.featureCache.waitUntilEpochOrThrow(newVersion, s.config.zkConnectionTimeoutMs)
@@ -100,11 +98,11 @@ class UpdateFeaturesTest extends BaseRequestTest {
     FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
   }
 
-  private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Features[FinalizedVersionRange] = {
-    Features.finalizedFeatures(features.asScala.map {
+  private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Map[String, Short] = {
+    features.asScala.map {
       case(name, versionRange) =>
-        (name, new FinalizedVersionRange(versionRange.minVersionLevel(), versionRange.maxVersionLevel()))
-    }.asJava)
+        (name, versionRange.maxVersionLevel())
+    }.toMap
   }
 
   private def supportedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.SupportedVersionRange]): Features[SupportedVersionRange] = {
@@ -116,7 +114,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
 
   private def checkFeatures(client: Admin,
                             expectedNode: FeatureZNode,
-                            expectedFinalizedFeatures: Features[FinalizedVersionRange],
+                            expectedFinalizedFeatures: Map[String, Short],
                             expectedFinalizedFeaturesEpoch: Long,
                             expectedSupportedFeatures: Features[SupportedVersionRange]): Unit = {
     assertEquals(expectedNode, getFeatureZNode())
@@ -183,8 +181,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
 
     val nodeBefore = getFeatureZNode()
     val validUpdates = new FeatureUpdateKeyCollection()
-    val validUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey();
-    validUpdate.setFeature("feature_1");
+    val validUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey()
+    validUpdate.setFeature("feature_1")
     validUpdate.setMaxVersionLevel(defaultSupportedFeatures().get("feature_1").max())
     validUpdate.setAllowDowngrade(false)
     validUpdates.add(validUpdate)
@@ -210,7 +208,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
    */
   @Test
   def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): Unit = {
-    val targetMaxVersionLevel = (defaultFinalizedFeatures().get("feature_1").max() - 1).asInstanceOf[Short]
+    val targetMaxVersionLevel = (defaultFinalizedFeatures()("feature_1") - 1).asInstanceOf[Short]
     testWithInvalidFeatureUpdate[InvalidRequestException](
       "feature_1",
       new FeatureUpdate(targetMaxVersionLevel, FeatureUpdate.UpgradeType.UPGRADE),
@@ -223,11 +221,11 @@ class UpdateFeaturesTest extends BaseRequestTest {
    */
   @Test
   def testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): Unit = {
-    val targetMaxVersionLevel = (defaultFinalizedFeatures().get("feature_1").max() + 1).asInstanceOf[Short]
+    val targetMaxVersionLevel = (defaultFinalizedFeatures()("feature_1") + 1).asInstanceOf[Short]
     testWithInvalidFeatureUpdate[InvalidRequestException](
       "feature_1",
       new FeatureUpdate(targetMaxVersionLevel,  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
-      ".*When the downgradeType is set to SAFE set in the request, the provided maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
+      ".*When the downgradeType is set to SAFE in the request, the provided versionLevel:3.*existing versionLevel:2.*".r)
   }
 
   /**
@@ -245,14 +243,14 @@ class UpdateFeaturesTest extends BaseRequestTest {
     val nodeBefore = getFeatureZNode()
 
     val invalidUpdates
-      = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
-    val invalidUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey();
+      = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection()
+    val invalidUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey()
     invalidUpdate.setFeature("feature_1")
     invalidUpdate.setMaxVersionLevel(0)
     invalidUpdate.setAllowDowngrade(false)
-    invalidUpdates.add(invalidUpdate);
+    invalidUpdates.add(invalidUpdate)
     val requestData = new UpdateFeaturesRequestData()
-    requestData.setFeatureUpdates(invalidUpdates);
+    requestData.setFeatureUpdates(invalidUpdates)
 
     val response = connectAndReceive[UpdateFeaturesResponse](
       new UpdateFeaturesRequest.Builder(new UpdateFeaturesRequestData().setFeatureUpdates(invalidUpdates)).build(),
@@ -264,7 +262,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
     assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode))
     assertNotNull(result.errorMessage)
     assertFalse(result.errorMessage.isEmpty)
-    val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than 1.*".r
+    val exceptionMsgPattern = ".*Can not provide versionLevel: 0 less than 1.*".r
     assertTrue(exceptionMsgPattern.findFirstIn(result.errorMessage).isDefined, result.errorMessage)
     checkFeatures(
       adminClient,
@@ -292,7 +290,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
    */
   @Test
   def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = {
-    val targetMaxVersionLevel = defaultFinalizedFeatures().get("feature_1").max()
+    val targetMaxVersionLevel = defaultFinalizedFeatures()("feature_1")
     testWithInvalidFeatureUpdate[InvalidRequestException](
       "feature_1",
       new FeatureUpdate(targetMaxVersionLevel,  FeatureUpdate.UpgradeType.UPGRADE),
@@ -302,7 +300,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
   private def testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibility(
     featureName: String,
     supportedVersionRange: SupportedVersionRange,
-    initialFinalizedVersionRange: Option[FinalizedVersionRange]
+    initialFinalizedVersionRange: Option[Short]
   ): Unit = {
     TestUtils.waitUntilControllerElected(zkClient)
 
@@ -327,8 +325,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
     updateSupportedFeatures(supportedFeaturesWithVersionIncompatibility, brokersWithVersionIncompatibility)
 
     val initialFinalizedFeatures = initialFinalizedVersionRange.map(
-      versionRange => Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(featureName, versionRange)))
-    ).getOrElse(Features.emptyFinalizedFeatures())
+      versionRange => Utils.mkMap(Utils.mkEntry(featureName, versionRange)).asScala.toMap
+    ).getOrElse(Map.empty[String, Short])
     val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
 
     val invalidUpdate = new FeatureUpdate(supportedVersionRange.max(),  FeatureUpdate.UpgradeType.UPGRADE)
@@ -358,7 +356,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
     testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibility(
       feature,
       defaultSupportedFeatures().get(feature),
-      Some(defaultFinalizedFeatures().get(feature)))
+      Some(defaultFinalizedFeatures()(feature)))
   }
 
   /**
@@ -389,14 +387,13 @@ class UpdateFeaturesTest extends BaseRequestTest {
           Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
           Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))
     updateSupportedFeaturesInAllBrokers(supportedFeatures)
-    val versionBefore = updateFeatureZNode(Features.emptyFinalizedFeatures())
+    val versionBefore = updateFeatureZNode(Map.empty)
 
-    val targetFinalizedFeatures = Features.finalizedFeatures(
-      Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
-    val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(),  FeatureUpdate.UpgradeType.UPGRADE)
-    val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(),  FeatureUpdate.UpgradeType.UPGRADE)
+    val targetFinalizedFeatures = Utils.mkMap(
+      Utils.mkEntry("feature_1", 3.toShort),
+      Utils.mkEntry("feature_2", 3.toShort)).asScala.toMap
+    val update1 = new FeatureUpdate(targetFinalizedFeatures("feature_1"),  FeatureUpdate.UpgradeType.UPGRADE)
+    val update2 = new FeatureUpdate(targetFinalizedFeatures("feature_2"),  FeatureUpdate.UpgradeType.UPGRADE)
 
     val adminClient = createAdminClient()
     adminClient.updateFeatures(
@@ -406,7 +403,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
 
     checkFeatures(
       adminClient,
-      new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
+      FeatureZNode(IBP_2_7_IV0, FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
       targetFinalizedFeatures,
       versionBefore + 1,
       supportedFeatures)
@@ -425,21 +422,19 @@ class UpdateFeaturesTest extends BaseRequestTest {
         Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
         Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))
     updateSupportedFeaturesInAllBrokers(supportedFeatures)
-    val initialFinalizedFeatures = Features.finalizedFeatures(
-      Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
+    val initialFinalizedFeatures = Utils.mkMap(
+      Utils.mkEntry("feature_1", 2.toShort),
+      Utils.mkEntry("feature_2", 4.toShort)).asScala.toMap
     val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
 
     // Below we aim to do the following:
     // - Valid upgrade of feature_1 maxVersionLevel from 2 to 3
     // - Valid downgrade of feature_2 maxVersionLevel from 4 to 3
-    val targetFinalizedFeatures = Features.finalizedFeatures(
-      Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
-    val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(),  FeatureUpdate.UpgradeType.UPGRADE)
-    val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(),  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
+    val targetFinalizedFeatures = Utils.mkMap(
+      Utils.mkEntry("feature_1", 3.toShort),
+      Utils.mkEntry("feature_2", 3.toShort)).asScala.toMap
+    val update1 = new FeatureUpdate(targetFinalizedFeatures("feature_1"),  FeatureUpdate.UpgradeType.UPGRADE)
+    val update2 = new FeatureUpdate(targetFinalizedFeatures("feature_2"),  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
 
     val adminClient = createAdminClient()
     adminClient.updateFeatures(
@@ -449,7 +444,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
 
     checkFeatures(
       adminClient,
-      new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
+      FeatureZNode(IBP_2_7_IV0, FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
       targetFinalizedFeatures,
       versionBefore + 1,
       supportedFeatures)
@@ -469,22 +464,20 @@ class UpdateFeaturesTest extends BaseRequestTest {
         Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
         Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))
     updateSupportedFeaturesInAllBrokers(supportedFeatures)
-    val initialFinalizedFeatures = Features.finalizedFeatures(
-      Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
+    val initialFinalizedFeatures = Utils.mkMap(
+      Utils.mkEntry("feature_1", 2.toShort),
+      Utils.mkEntry("feature_2", 4.toShort)).asScala.toMap
     val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
 
     // Below we aim to do the following:
     // - Valid upgrade of feature_1 maxVersionLevel from 2 to 3
     // - Invalid downgrade of feature_2 maxVersionLevel from 4 to 3
     //   (because we intentionally do not set the allowDowngrade flag)
-    val targetFinalizedFeatures = Features.finalizedFeatures(
-      Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
-    val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(),  FeatureUpdate.UpgradeType.UPGRADE)
-    val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(),  FeatureUpdate.UpgradeType.UPGRADE)
+    val targetFinalizedFeatures = Utils.mkMap(
+      Utils.mkEntry("feature_1", 3.toShort),
+      Utils.mkEntry("feature_2", 3.toShort)).asScala.toMap
+    val validUpdate = new FeatureUpdate(targetFinalizedFeatures("feature_1"),  FeatureUpdate.UpgradeType.UPGRADE)
+    val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures("feature_2"),  FeatureUpdate.UpgradeType.UPGRADE)
 
     val adminClient = createAdminClient()
     val result = adminClient.updateFeatures(
@@ -496,13 +489,12 @@ class UpdateFeaturesTest extends BaseRequestTest {
     // Expect update for "feature_2" to have failed.
     checkException[InvalidRequestException](
       result, Map("feature_2" -> ".*Can not downgrade finalized feature.*".r))
-    val expectedFeatures = Features.finalizedFeatures(
-      Utils.mkMap(
-        Utils.mkEntry("feature_1", targetFinalizedFeatures.get("feature_1")),
-        Utils.mkEntry("feature_2", initialFinalizedFeatures.get("feature_2"))))
+    val expectedFeatures = Utils.mkMap(
+      Utils.mkEntry("feature_1", targetFinalizedFeatures("feature_1")),
+      Utils.mkEntry("feature_2", initialFinalizedFeatures("feature_2"))).asScala.toMap
     checkFeatures(
       adminClient,
-      FeatureZNode(FeatureZNodeStatus.Enabled, expectedFeatures),
+      FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, expectedFeatures),
       expectedFeatures,
       versionBefore + 1,
       supportedFeatures)
@@ -537,22 +529,20 @@ class UpdateFeaturesTest extends BaseRequestTest {
         Utils.mkEntry("feature_2", supportedFeatures.get("feature_2"))))
     updateSupportedFeatures(supportedFeaturesWithVersionIncompatibility, brokersWithVersionIncompatibility)
 
-    val initialFinalizedFeatures = Features.finalizedFeatures(
-      Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
+    val initialFinalizedFeatures = Utils.mkMap(
+      Utils.mkEntry("feature_1", 2.toShort),
+      Utils.mkEntry("feature_2", 4.toShort)).asScala.toMap
     val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
 
     // Below we aim to do the following:
     // - Invalid upgrade of feature_1 maxVersionLevel from 2 to 3
     //   (because one of the brokers does not support the max version: 3)
     // - Valid downgrade of feature_2 maxVersionLevel from 4 to 3
-    val targetFinalizedFeatures = Features.finalizedFeatures(
-      Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
-    val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(),  FeatureUpdate.UpgradeType.UPGRADE)
-    val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(),  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
+    val targetFinalizedFeatures = Utils.mkMap(
+      Utils.mkEntry("feature_1", 3.toShort),
+      Utils.mkEntry("feature_2", 3.toShort)).asScala.toMap
+    val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures("feature_1"),  FeatureUpdate.UpgradeType.UPGRADE)
+    val validUpdate = new FeatureUpdate(targetFinalizedFeatures("feature_2"),  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
 
     val adminClient = createAdminClient()
     val result = adminClient.updateFeatures(
@@ -563,13 +553,12 @@ class UpdateFeaturesTest extends BaseRequestTest {
     result.values().get("feature_2").get()
     // Expect update for "feature_1" to have failed.
     checkException[InvalidRequestException](result, Map("feature_1" -> ".*brokers.*incompatible.*".r))
-    val expectedFeatures = Features.finalizedFeatures(
-      Utils.mkMap(
-        Utils.mkEntry("feature_1", initialFinalizedFeatures.get("feature_1")),
-        Utils.mkEntry("feature_2", targetFinalizedFeatures.get("feature_2"))))
+    val expectedFeatures = Utils.mkMap(
+      Utils.mkEntry("feature_1", initialFinalizedFeatures("feature_1")),
+      Utils.mkEntry("feature_2", targetFinalizedFeatures("feature_2"))).asScala.toMap
     checkFeatures(
       adminClient,
-      FeatureZNode(FeatureZNodeStatus.Enabled, expectedFeatures),
+      FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, expectedFeatures),
       expectedFeatures,
       versionBefore + 1,
       supportedFeatures)
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 455e051541..7f3c12122e 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -152,7 +152,7 @@ public enum MetadataVersion {
     // Support for leader recovery for unclean leader election (KIP-704)
     IBP_3_2_IV0(4, "3.2", "IV0", false),
 
-    // Support for metadata.version feature flag (KIP-778)
+    // Support for metadata.version feature flag and Removes min_version_level from the finalized version range that is written to ZooKeeper (KIP-778)
     IBP_3_3_IV0(5, "3.3", "IV0", false);
 
     public static final String FEATURE_NAME = "metadata.version";
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 8a825e3da2..bbce1b6ae4 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -59,6 +59,7 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -186,6 +187,9 @@ class MetadataVersionTest {
 
         assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
         assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2-IV0"));
+
+        assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3"));
+        assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3-IV0"));
     }
 
     @Test
@@ -228,7 +232,7 @@ class MetadataVersionTest {
         assertEquals("3.0", IBP_3_0_IV0.shortVersion());
         assertEquals("3.0", IBP_3_0_IV1.shortVersion());
         assertEquals("3.1", IBP_3_1_IV0.shortVersion());
-        assertEquals("3.2", IBP_3_2_IV0.shortVersion());
+        assertEquals("3.3", IBP_3_3_IV0.shortVersion());
     }
 
     @Test
@@ -261,6 +265,7 @@ class MetadataVersionTest {
         assertEquals("3.0-IV1", IBP_3_0_IV1.version());
         assertEquals("3.1-IV0", IBP_3_1_IV0.version());
         assertEquals("3.2-IV0", IBP_3_2_IV0.version());
+        assertEquals("3.3-IV0", IBP_3_3_IV0.version());
     }
 
     @Test