You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/04/14 17:04:42 UTC

[kafka] branch trunk updated: KAFKA-13823 Feature flag changes from KIP-778 (#12036)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55ff5d3603 KAFKA-13823 Feature flag changes from KIP-778 (#12036)
55ff5d3603 is described below

commit 55ff5d360381af370fe5b3a215831beac49571a4
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Apr 14 13:04:32 2022 -0400

    KAFKA-13823 Feature flag changes from KIP-778 (#12036)
    
    This PR includes the changes to feature flags that were outlined in KIP-778.  Specifically, it
    changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds
    dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It
    introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because
    FeatureLevelRecord was unused previously, we do not need to introduce a new version.
    
    The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade,
    downgrade, and disable sub-commands.  Refer to
    [KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more
    details on the new command structure.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, dengziming <de...@gmail.com>
---
 checkstyle/suppressions.xml                        |   4 +-
 .../apache/kafka/clients/admin/FeatureUpdate.java  |  75 ++-
 .../kafka/clients/admin/KafkaAdminClient.java      |   3 +-
 .../kafka/clients/admin/UpdateFeaturesOptions.java |  20 +
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   2 +-
 .../common/requests/UpdateFeaturesRequest.java     |  54 +-
 .../common/message/UpdateFeaturesRequest.json      |  14 +-
 .../common/message/UpdateFeaturesResponse.json     |   2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  14 +-
 .../apache/kafka/common/protocol/ApiKeysTest.java  |   2 +-
 .../common/requests/UpdateFeaturesRequestTest.java |  90 ++++
 .../main/scala/kafka/admin/FeatureCommand.scala    | 581 ++++++++++-----------
 .../scala/kafka/controller/KafkaController.scala   |  57 +-
 .../main/scala/kafka/server/BrokerFeatures.scala   |  12 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  20 +-
 .../main/scala/kafka/server/ControllerApis.scala   |  19 +-
 .../main/scala/kafka/server/ControllerServer.scala |   9 +-
 .../scala/kafka/server/FinalizedFeatureCache.scala |   6 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |   3 +-
 core/src/main/scala/kafka/server/Server.scala      |   6 -
 core/src/test/java/kafka/test/MockController.java  |  11 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |   4 +-
 .../kafka/server/QuorumTestHarness.scala           |   3 +-
 .../unit/kafka/admin/FeatureCommandTest.scala      | 182 +------
 .../unit/kafka/server/BrokerFeaturesTest.scala     |   3 +-
 .../unit/kafka/server/ControllerApisTest.scala     |   1 -
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../unit/kafka/server/UpdateFeaturesTest.scala     |  62 +--
 .../kafka/controller/ClusterControlManager.java    |  24 +-
 .../org/apache/kafka/controller/Controller.java    |  15 +-
 .../kafka/controller/FeatureControlManager.java    | 115 ++--
 .../kafka/controller/ProducerIdControlManager.java |  24 +-
 .../apache/kafka/controller/QuorumController.java  |  50 +-
 .../apache/kafka/controller/QuorumFeatures.java    |  51 ++
 .../java/org/apache/kafka/image/FeaturesDelta.java |  19 +-
 .../java/org/apache/kafka/image/FeaturesImage.java |  22 +-
 .../apache/kafka/metadata/BrokerRegistration.java  |   2 +-
 .../java/org/apache/kafka/metadata/FeatureMap.java |  67 ---
 ...Epoch.java => FinalizedControllerFeatures.java} |  30 +-
 .../org/apache/kafka/metadata/VersionRange.java    |  22 +-
 .../common/metadata/FeatureLevelRecord.json        |   6 +-
 .../controller/ClusterControlManagerTest.java      |   5 +-
 .../controller/FeatureControlManagerTest.java      | 119 +++--
 .../kafka/controller/QuorumControllerTest.java     |  10 +-
 .../kafka/controller/QuorumControllerTestEnv.java  |   1 +
 .../org/apache/kafka/image/ClusterImageTest.java   |   8 +-
 .../org/apache/kafka/image/FeaturesImageTest.java  |  15 +-
 .../kafka/metadata/BrokerRegistrationTest.java     |   6 +-
 .../apache/kafka/metadata/VersionRangeTest.java    |  19 +-
 .../kafka/raft/internals/RecordsIterator.java      |   1 +
 50 files changed, 1027 insertions(+), 865 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0824e0b60b..c1d3cb078d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -291,12 +291,12 @@
               files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(QuorumController|ReplicationControlManager|ReplicationControlManagerTest).java"/>
-    <suppress checks="ParameterNumber"
+    <suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
               files="(QuorumController).java"/>
     <suppress checks="CyclomaticComplexity"
               files="(ClientQuotasImage|MetadataDelta|QuorumController|ReplicationControlManager).java"/>
     <suppress checks="NPathComplexity"
-              files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager).java"/>
+              files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager).java"/>
     <suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
             files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="BooleanExpressionComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java b/clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
index 38753af3fe..b1dd026078 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
@@ -23,33 +23,86 @@ import java.util.Objects;
  */
 public class FeatureUpdate {
     private final short maxVersionLevel;
-    private final boolean allowDowngrade;
+    private final UpgradeType upgradeType;
+
+    public enum UpgradeType {
+        UNKNOWN(0),
+        UPGRADE(1),
+        SAFE_DOWNGRADE(2),
+        UNSAFE_DOWNGRADE(3);
+
+        private final byte code;
+
+        UpgradeType(int code) {
+            this.code = (byte) code;
+        }
+
+        public byte code() {
+            return code;
+        }
+
+        public static UpgradeType fromCode(int code) {
+            if (code == 1) {
+                return UPGRADE;
+            } else if (code == 2) {
+                return SAFE_DOWNGRADE;
+            } else if (code == 3) {
+                return UNSAFE_DOWNGRADE;
+            } else {
+                return UNKNOWN;
+            }
+        }
+    }
 
     /**
      * @param maxVersionLevel   the new maximum version level for the finalized feature.
-     *                          a value &lt; 1 is special and indicates that the update is intended to
+     *                          a value of zero is special and indicates that the update is intended to
      *                          delete the finalized feature, and should be accompanied by setting
      *                          the allowDowngrade flag to true.
      * @param allowDowngrade    - true, if this feature update was meant to downgrade the existing
-     *                            maximum version level of the finalized feature.
+     *                            maximum version level of the finalized feature. Only "safe" downgrades are
+     *                            enabled with this boolean. See {@link FeatureUpdate#FeatureUpdate(short, UpgradeType)}
      *                          - false, otherwise.
      */
+    @Deprecated
     public FeatureUpdate(final short maxVersionLevel, final boolean allowDowngrade) {
-        if (maxVersionLevel < 1 && !allowDowngrade) {
+        this(maxVersionLevel, allowDowngrade ? UpgradeType.SAFE_DOWNGRADE : UpgradeType.UPGRADE);
+    }
+
+    /**
+     * @param maxVersionLevel   The new maximum version level for the finalized feature.
+     *                          a value of zero is special and indicates that the update is intended to
+     *                          delete the finalized feature, and should be accompanied by setting
+     *                          the upgradeType to safe or unsafe.
+     * @param upgradeType     Indicate what kind of upgrade should be performed in this operation.
+     *                          - UPGRADE: upgrading the feature level
+     *                          - SAFE_DOWNGRADE: only downgrades which do not result in metadata loss are permitted
+     *                          - UNSAFE_DOWNGRADE: any downgrade, including those which may result in metadata loss, are permitted
+     */
+    public FeatureUpdate(final short maxVersionLevel, final UpgradeType upgradeType) {
+        if (maxVersionLevel == 0 && upgradeType.equals(UpgradeType.UPGRADE)) {
             throw new IllegalArgumentException(String.format(
-                "The allowDowngrade flag should be set when the provided maxVersionLevel:%d is < 1.",
-                maxVersionLevel));
+                    "The downgradeType flag should be set to SAFE or UNSAFE when the provided maxVersionLevel:%d is < 1.",
+                    maxVersionLevel));
+        }
+        if (maxVersionLevel < 0) {
+            throw new IllegalArgumentException("Cannot specify a negative version level.");
         }
         this.maxVersionLevel = maxVersionLevel;
-        this.allowDowngrade = allowDowngrade;
+        this.upgradeType = upgradeType;
     }
 
     public short maxVersionLevel() {
         return maxVersionLevel;
     }
 
+    @Deprecated
     public boolean allowDowngrade() {
-        return allowDowngrade;
+        return upgradeType != UpgradeType.UPGRADE;
+    }
+
+    public UpgradeType upgradeType() {
+        return upgradeType;
     }
 
     @Override
@@ -63,16 +116,16 @@ public class FeatureUpdate {
         }
 
         final FeatureUpdate that = (FeatureUpdate) other;
-        return this.maxVersionLevel == that.maxVersionLevel && this.allowDowngrade == that.allowDowngrade;
+        return this.maxVersionLevel == that.maxVersionLevel && this.upgradeType.equals(that.upgradeType);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxVersionLevel, allowDowngrade);
+        return Objects.hash(maxVersionLevel, upgradeType);
     }
 
     @Override
     public String toString() {
-        return String.format("FeatureUpdate{maxVersionLevel:%d, allowDowngrade:%s}", maxVersionLevel, allowDowngrade);
+        return String.format("FeatureUpdate{maxVersionLevel:%d, downgradeType:%s}", maxVersionLevel, upgradeType);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index cf99556b0f..cc913bfda0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4265,12 +4265,13 @@ public class KafkaAdminClient extends AdminClient {
                         new UpdateFeaturesRequestData.FeatureUpdateKey();
                     requestItem.setFeature(feature);
                     requestItem.setMaxVersionLevel(update.maxVersionLevel());
-                    requestItem.setAllowDowngrade(update.allowDowngrade());
+                    requestItem.setUpgradeType(update.upgradeType().code());
                     featureUpdatesRequestData.add(requestItem);
                 }
                 return new UpdateFeaturesRequest.Builder(
                     new UpdateFeaturesRequestData()
                         .setTimeoutMs(timeoutMs)
+                        .setValidateOnly(options.validateOnly())
                         .setFeatureUpdates(featureUpdatesRequestData));
             }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java
index 7a9f2141b2..455f2b87d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java
@@ -26,4 +26,24 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  */
 @InterfaceStability.Evolving
 public class UpdateFeaturesOptions extends AbstractOptions<UpdateFeaturesOptions> {
+    private boolean validateOnly = false;
+
+    @Deprecated
+    public boolean dryRun() {
+        return validateOnly;
+    }
+
+    public boolean validateOnly() {
+        return validateOnly;
+    }
+
+    @Deprecated
+    public UpdateFeaturesOptions dryRun(boolean dryRun) {
+        return validateOnly(dryRun);
+    }
+
+    public UpdateFeaturesOptions validateOnly(boolean validateOnly) {
+        this.validateOnly = validateOnly;
+        return this;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index ad96cdfa42..399b631d76 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -98,7 +98,7 @@ public enum ApiKeys {
     END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
     DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, true),
     ALTER_PARTITION(ApiMessageType.ALTER_PARTITION, true),
-    UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES),
+    UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, true, true),
     ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false),
     FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false),
     DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
index 7a6bf66cd9..27cddfadca 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
@@ -16,15 +16,46 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.message.UpdateFeaturesRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.stream.Collectors;
 
 public class UpdateFeaturesRequest extends AbstractRequest {
 
+    public static class FeatureUpdateItem {
+        private final String featureName;
+        private final short featureLevel;
+        private final FeatureUpdate.UpgradeType upgradeType;
+
+        public FeatureUpdateItem(String featureName, short featureLevel, FeatureUpdate.UpgradeType upgradeType) {
+            this.featureName = featureName;
+            this.featureLevel = featureLevel;
+            this.upgradeType = upgradeType;
+        }
+
+        public String feature() {
+            return featureName;
+        }
+
+        public short versionLevel() {
+            return featureLevel;
+        }
+
+        public FeatureUpdate.UpgradeType upgradeType() {
+            return upgradeType;
+        }
+
+        public boolean isDeleteRequest() {
+            return featureLevel < 1 && !upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE);
+        }
+    }
+
     public static class Builder extends AbstractRequest.Builder<UpdateFeaturesRequest> {
 
         private final UpdateFeaturesRequestData data;
@@ -52,6 +83,25 @@ public class UpdateFeaturesRequest extends AbstractRequest {
         this.data = data;
     }
 
+    public FeatureUpdateItem getFeature(String name) {
+        UpdateFeaturesRequestData.FeatureUpdateKey update = data.featureUpdates().find(name);
+        if (super.version() == 0) {
+            if (update.allowDowngrade()) {
+                return new FeatureUpdateItem(update.feature(), update.maxVersionLevel(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
+            } else {
+                return new FeatureUpdateItem(update.feature(), update.maxVersionLevel(), FeatureUpdate.UpgradeType.UPGRADE);
+            }
+        } else {
+            return new FeatureUpdateItem(update.feature(), update.maxVersionLevel(), FeatureUpdate.UpgradeType.fromCode(update.upgradeType()));
+        }
+    }
+
+    public Collection<FeatureUpdateItem> featureUpdates() {
+        return data.featureUpdates().stream()
+            .map(update -> getFeature(update.feature()))
+            .collect(Collectors.toList());
+    }
+
     @Override
     public UpdateFeaturesResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         return UpdateFeaturesResponse.createWithErrors(
@@ -69,8 +119,4 @@ public class UpdateFeaturesRequest extends AbstractRequest {
     public static UpdateFeaturesRequest parse(ByteBuffer buffer, short version) {
         return new UpdateFeaturesRequest(new UpdateFeaturesRequestData(new ByteBufferAccessor(buffer), version), version);
     }
-
-    public static boolean isDeleteRequest(UpdateFeaturesRequestData.FeatureUpdateKey update) {
-        return update.maxVersionLevel() < 1 && update.allowDowngrade();
-    }
 }
diff --git a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
index 2b3181362d..27ed8420fb 100644
--- a/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
+++ b/clients/src/main/resources/common/message/UpdateFeaturesRequest.json
@@ -16,9 +16,9 @@
 {
   "apiKey": 57,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker", "broker", "controller"],
   "name": "UpdateFeaturesRequest",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
@@ -29,8 +29,12 @@
         "about": "The name of the finalized feature to be updated."},
       {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
         "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."},
-      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
-        "about": "When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."}
-    ]}
+      {"name": "AllowDowngrade", "type": "bool", "versions": "0",
+        "about": "DEPRECATED in version 1 (see DowngradeType). When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."},
+      {"name": "UpgradeType", "type": "int8", "versions": "1+", "default": 1,
+        "about": "Determine which type of upgrade will be performed: 1 will perform an upgrade only (default), 2 is safe downgrades only (lossless), 3 is unsafe downgrades (lossy)."}
+    ]},
+    {"name": "ValidateOnly", "type": "bool", "versions": "1+", "default": false,
+      "about": "True if we should validate the request, but not perform the upgrade or downgrade."}
   ]
 }
diff --git a/clients/src/main/resources/common/message/UpdateFeaturesResponse.json b/clients/src/main/resources/common/message/UpdateFeaturesResponse.json
index 63e84ff968..033926b801 100644
--- a/clients/src/main/resources/common/message/UpdateFeaturesResponse.json
+++ b/clients/src/main/resources/common/message/UpdateFeaturesResponse.json
@@ -17,7 +17,7 @@
   "apiKey": 57,
   "type": "response",
   "name": "UpdateFeaturesResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c337831b07..eb4681856e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4685,8 +4685,8 @@ public class KafkaAdminClientTest {
 
     private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
         return Utils.mkMap(
-            Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)),
-            Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, true)));
+            Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,  FeatureUpdate.UpgradeType.UPGRADE)),
+            Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3,  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)));
     }
 
     private Map<String, ApiError> makeTestFeatureUpdateErrors(final Map<String, FeatureUpdate> updates, final Errors error) {
@@ -4782,8 +4782,8 @@ public class KafkaAdminClientTest {
                 env.cluster().nodeById(controllerId));
             final KafkaFuture<Void> future = env.adminClient().updateFeatures(
                 Utils.mkMap(
-                    Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)),
-                    Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, true))),
+                    Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,  FeatureUpdate.UpgradeType.UPGRADE)),
+                    Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3,  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))),
                 new UpdateFeaturesOptions().timeoutMs(10000)
             ).all();
             future.get();
@@ -4806,8 +4806,8 @@ public class KafkaAdminClientTest {
             assertThrows(
                 IllegalArgumentException.class,
                 () -> env.adminClient().updateFeatures(
-                    Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2, false)),
-                                Utils.mkEntry("", new FeatureUpdate((short) 2, false))),
+                    Utils.mkMap(Utils.mkEntry("feature", new FeatureUpdate((short) 2,  FeatureUpdate.UpgradeType.UPGRADE)),
+                                Utils.mkEntry("", new FeatureUpdate((short) 2,  FeatureUpdate.UpgradeType.UPGRADE))),
                     new UpdateFeaturesOptions()));
         }
     }
@@ -4816,7 +4816,7 @@ public class KafkaAdminClientTest {
     public void testUpdateFeaturesShouldFailRequestInClientWhenDowngradeFlagIsNotSetDuringDeletion() {
         assertThrows(
             IllegalArgumentException.class,
-            () -> new FeatureUpdate((short) 0, false));
+            () -> new FeatureUpdate((short) 0,  FeatureUpdate.UpgradeType.UPGRADE));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index ec6259ae72..7e0a6e438e 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -62,7 +62,7 @@ public class ApiKeysTest {
     public void testResponseThrottleTime() {
         Set<ApiKeys> authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
         // Newer protocol apis include throttle time ms even for cluster actions
-        Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_PARTITION, ApiKeys.ALLOCATE_PRODUCER_IDS);
+        Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_PARTITION, ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES);
         for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) {
             Schema responseSchema = apiKey.messageType.responseSchemas()[apiKey.latestVersion()];
             BoundField throttleTimeField = responseSchema.get("throttle_time_ms");
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
index 1b63aecd01..cf267da557 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
@@ -16,14 +16,20 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.UpdateFeaturesRequestData;
 import org.apache.kafka.common.protocol.Errors;
 import org.junit.jupiter.api.Test;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class UpdateFeaturesRequestTest {
 
@@ -53,4 +59,88 @@ public class UpdateFeaturesRequestTest {
         assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 1), response.errorCounts());
     }
 
+    @Test
+    public void testUpdateFeaturesV0() {
+        UpdateFeaturesRequestData.FeatureUpdateKeyCollection features =
+                new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+
+        features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+            .setFeature("foo")
+            .setMaxVersionLevel((short) 1)
+            .setAllowDowngrade(true)
+        );
+
+        features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+            .setFeature("bar")
+            .setMaxVersionLevel((short) 3)
+        );
+
+        UpdateFeaturesRequest request = new UpdateFeaturesRequest(
+            new UpdateFeaturesRequestData().setFeatureUpdates(features),
+            UpdateFeaturesRequestData.LOWEST_SUPPORTED_VERSION
+        );
+        ByteBuffer buffer = request.serialize();
+        request = UpdateFeaturesRequest.parse(buffer, UpdateFeaturesRequestData.LOWEST_SUPPORTED_VERSION);
+
+        List<UpdateFeaturesRequest.FeatureUpdateItem> updates = new ArrayList<>(request.featureUpdates());
+        assertEquals(updates.size(), 2);
+        assertEquals(updates.get(0).upgradeType(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
+        assertEquals(updates.get(1).upgradeType(), FeatureUpdate.UpgradeType.UPGRADE);
+    }
+
+    @Test
+    public void testUpdateFeaturesV1() {
+        UpdateFeaturesRequestData.FeatureUpdateKeyCollection features =
+            new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+
+        features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+            .setFeature("foo")
+            .setMaxVersionLevel((short) 1)
+            .setUpgradeType(FeatureUpdate.UpgradeType.SAFE_DOWNGRADE.code())
+        );
+
+        features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+            .setFeature("bar")
+            .setMaxVersionLevel((short) 3)
+        );
+
+        UpdateFeaturesRequest request = new UpdateFeaturesRequest(
+            new UpdateFeaturesRequestData().setFeatureUpdates(features),
+            UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION
+        );
+
+        ByteBuffer buffer = request.serialize();
+        request = UpdateFeaturesRequest.parse(buffer, UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION);
+
+        List<UpdateFeaturesRequest.FeatureUpdateItem> updates = new ArrayList<>(request.featureUpdates());
+        assertEquals(updates.size(), 2);
+        assertEquals(updates.get(0).upgradeType(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE);
+        assertEquals(updates.get(1).upgradeType(), FeatureUpdate.UpgradeType.UPGRADE);
+
+    }
+
+    @Test
+    public void testUpdateFeaturesV1OldBoolean() {
+        UpdateFeaturesRequestData.FeatureUpdateKeyCollection features =
+            new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+
+        features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+            .setFeature("foo")
+            .setMaxVersionLevel((short) 1)
+            .setAllowDowngrade(true)
+        );
+
+        features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+            .setFeature("bar")
+            .setMaxVersionLevel((short) 3)
+        );
+
+        UpdateFeaturesRequest request = new UpdateFeaturesRequest(
+            new UpdateFeaturesRequestData().setFeatureUpdates(features),
+            UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION
+        );
+        assertThrows(UnsupportedVersionException.class, request::serialize,
+            "This should fail since allowDowngrade is not supported in v1 of this RPC");
+    }
+
 }
diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala
index 4b299652a6..c5c62648f4 100644
--- a/core/src/main/scala/kafka/admin/FeatureCommand.scala
+++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala
@@ -17,374 +17,317 @@
 
 package kafka.admin
 
-import kafka.server.BrokerFeatures
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import kafka.tools.TerseFailure
+import kafka.utils.Exit
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, storeTrue}
+import net.sourceforge.argparse4j.inf.{Namespace, Subparsers}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions}
-import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
+import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
 import org.apache.kafka.common.utils.Utils
-import java.util.Properties
 
+import java.io.File
+import java.util.Properties
 import scala.collection.Seq
-import scala.collection.immutable.ListMap
-import scala.jdk.CollectionConverters._
-import joptsimple.OptionSpec
-
 import scala.concurrent.ExecutionException
+import scala.jdk.CollectionConverters._
 
 object FeatureCommand {
 
   def main(args: Array[String]): Unit = {
-    val opts = new FeatureCommandOptions(args)
-    val featureApis = new FeatureApis(opts)
-    var exitCode = 0
+    val res = mainNoExit(args)
+    Exit.exit(res)
+  }
+
+  // This is used for integration tests in order to avoid killing the test with Exit.exit
+  def mainNoExit(args: Array[String]): Int = {
+    val parser = ArgumentParsers.newArgumentParser("kafka-features")
+      .defaultHelp(true)
+      .description("This tool manages feature flags in Kafka.")
+    parser.addArgument("--bootstrap-server")
+      .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.")
+      .required(true)
+
+    parser.addArgument("--command-config")
+      .`type`(fileType())
+      .help("Property file containing configs to be passed to Admin Client.")
+    val subparsers = parser.addSubparsers().dest("command")
+    addDescribeParser(subparsers)
+    addUpgradeParser(subparsers)
+    addDowngradeParser(subparsers)
+    addDisableParser(subparsers)
+
     try {
-      featureApis.execute()
+      val namespace = parser.parseArgsOrFail(args)
+      val command = namespace.getString("command")
+
+      val commandConfig = namespace.get[File]("command_config")
+      val props = if (commandConfig != null) {
+        if (!commandConfig.exists()) {
+          throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!")
+        }
+        Utils.loadProps(commandConfig.getPath)
+      } else {
+        new Properties()
+      }
+      props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server"))
+      val admin = Admin.create(props)
+
+      command match {
+        case "describe" => handleDescribe(namespace, admin)
+        case "upgrade" => handleUpgrade(namespace, admin)
+        case "downgrade" => handleDowngrade(namespace, admin)
+        case "disable" => handleDisable(namespace, admin)
+      }
+      admin.close()
+      0
     } catch {
-      case e: IllegalArgumentException =>
-        printException(e)
-        opts.parser.printHelpOn(System.err)
-        exitCode = 1
-      case _: UpdateFeaturesException =>
-        exitCode = 1
-      case e: ExecutionException =>
-        val cause = if (e.getCause == null) e else e.getCause
-        printException(cause)
-        exitCode = 1
-      case e: Throwable =>
-        printException(e)
-        exitCode = 1
-    } finally {
-      featureApis.close()
-      Exit.exit(exitCode)
+      case e: TerseFailure =>
+        System.err.println(e.getMessage)
+        1
     }
   }
 
-  private def printException(exception: Throwable): Unit = {
-    System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
-  }
-}
+  def addDescribeParser(subparsers: Subparsers): Unit = {
+    val describeParser = subparsers.addParser("describe")
+      .help("Describe one or more feature flags.")
 
-class UpdateFeaturesException(message: String) extends RuntimeException(message)
-
-/**
- * A class that provides necessary APIs to bridge feature APIs provided by the Admin client with
- * the requirements of the CLI tool.
- *
- * @param opts the CLI options
- */
-class FeatureApis(private var opts: FeatureCommandOptions) {
-  private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
-  private var adminClient = FeatureApis.createAdminClient(opts)
+    val featureArgs = describeParser.addArgumentGroup("Specific Features")
+    featureArgs.addArgument("--feature")
+      .action(append())
+      .help("A specific feature to describe. This option may be repeated for describing multiple feature flags.")
 
-  private def pad(op: String): String = {
-    f"$op%11s"
+    val releaseArgs = describeParser.addArgumentGroup("All Features for release")
+    releaseArgs.addArgument("--release")
   }
 
-  private val addOp = pad("[Add]")
-  private val upgradeOp = pad("[Upgrade]")
-  private val deleteOp = pad("[Delete]")
-  private val downgradeOp = pad("[Downgrade]")
-
-  // For testing only.
-  private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
-    supportedFeatures = newFeatures
+  def addUpgradeParser(subparsers: Subparsers): Unit = {
+    val upgradeParser = subparsers.addParser("upgrade")
+      .help("Upgrade one or more feature flags.")
+
+    val featureArgs = upgradeParser.addArgumentGroup("Upgrade specific features")
+    featureArgs.addArgument("--feature")
+      .action(append())
+      .help("A feature flag to upgrade. This option may be repeated for upgrading multiple feature flags.")
+    featureArgs.addArgument("--version")
+      .`type`(classOf[Short])
+      .help("The version to upgrade to.")
+      .action(append())
+
+    val releaseArgs = upgradeParser.addArgumentGroup("Upgrade to feature level defined for a given release")
+    releaseArgs.addArgument("--release")
+
+    upgradeParser.addArgument("--dry-run")
+      .help("Perform a dry-run of this upgrade operation.")
+      .action(storeTrue())
   }
 
-  // For testing only.
-  private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
-    adminClient.close()
-    adminClient = FeatureApis.createAdminClient(newOpts)
-    opts = newOpts
+  def addDowngradeParser(subparsers: Subparsers): Unit = {
+    val downgradeParser = subparsers.addParser("downgrade")
+      .help("Upgrade one or more feature flags.")
+
+    downgradeParser.addArgument("--feature")
+      .help("A feature flag to downgrade. This option may be repeated for downgrade multiple feature flags.")
+      .required(true)
+      .action(append())
+    downgradeParser.addArgument("--version")
+      .`type`(classOf[Short])
+      .help("The version to downgrade to.")
+      .required(true)
+      .action(append())
+    downgradeParser.addArgument("--unsafe")
+      .help("Perform this downgrade even if it considered unsafe. Refer to specific feature flag documentation for details.")
+      .action(storeTrue())
+    downgradeParser.addArgument("--dry-run")
+      .help("Perform a dry-run of this downgrade operation.")
+      .action(storeTrue())
   }
 
-  /**
-   * Describes the supported and finalized features. The request is issued to any of the provided
-   * bootstrap servers.
-   */
-  def describeFeatures(): Unit = {
-    val result = adminClient.describeFeatures.featureMetadata.get
-    val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
-
-    features.toList.sorted.foreach {
-      feature =>
-        val output = new StringBuilder()
-        output.append(s"Feature: $feature")
-
-        val (supportedMinVersion, supportedMaxVersion) = {
-          val supportedVersionRange = result.supportedFeatures.get(feature)
-          if (supportedVersionRange == null) {
-            ("-", "-")
-          } else {
-            (supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
-          }
-        }
-        output.append(s"\tSupportedMinVersion: $supportedMinVersion")
-        output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+  def addDisableParser(subparsers: Subparsers): Unit = {
+    val disableParser = subparsers.addParser("disable")
+      .help("Disable one or more feature flags. This is the same as downgrading the version to zero.")
+
+    disableParser.addArgument("--feature")
+      .help("A feature flag to disable. This option may be repeated for disable multiple feature flags.")
+      .required(true)
+      .action(append())
+    disableParser.addArgument("--unsafe")
+      .help("Disable the feature flag(s) even if it considered unsafe. Refer to specific feature flag documentation for details.")
+      .action(storeTrue())
+    disableParser.addArgument("--dry-run")
+      .help("Perform a dry-run of this disable operation.")
+      .action(storeTrue())
+  }
 
-        val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
-          val finalizedVersionRange = result.finalizedFeatures.get(feature)
-          if (finalizedVersionRange == null) {
-            ("-", "-")
-          } else {
-            (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
-          }
-        }
-        output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
-        output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
+  def handleDescribe(namespace: Namespace, admin: Admin): Unit = {
+    val featureFilter = parseFeaturesOrRelease(namespace) match {
+      case Neither() => (_: String) => true
+      case Features(featureNames) => (feature: String) => featureNames.contains(feature)
+      case Release(release) =>
+        // Special case, print the versions associated with the given release
+        printReleaseFeatures(release)
+        return
+      case Both() => throw new TerseFailure("Only one of --release or --feature may be specified with describe sub-command.")
+    }
 
-        val epoch = {
-          if (result.finalizedFeaturesEpoch.isPresent) {
-            result.finalizedFeaturesEpoch.get.toString
+    val featureMetadata = admin.describeFeatures().featureMetadata().get()
+    val featureEpoch = featureMetadata.finalizedFeaturesEpoch()
+    val epochString = if (featureEpoch.isPresent) {
+      s"Epoch: ${featureEpoch.get}"
+    } else {
+      "Epoch: -"
+    }
+    val finalized = featureMetadata.finalizedFeatures().asScala
+    featureMetadata.supportedFeatures().asScala.foreach {
+      case (feature, range) =>
+        if (featureFilter.apply(feature)) {
+          if (finalized.contains(feature)) {
+            println(s"Feature: $feature\tSupportedMinVersion: ${range.minVersion()}\t" +
+              s"SupportedMaxVersion: ${range.maxVersion()}\tFinalizedVersionLevel: ${finalized(feature).maxVersionLevel()}\t$epochString")
           } else {
-            "-"
+            println(s"Feature: $feature\tSupportedMinVersion: ${range.minVersion()}\t" +
+              s"SupportedMaxVersion: ${range.maxVersion()}\tFinalizedVersionLevel: -\t$epochString")
           }
         }
-        output.append(s"\tEpoch: $epoch")
-
-        println(output)
     }
   }
 
-  /**
-   * Upgrades all features known to this tool to their highest max version levels. The method may
-   * add new finalized features if they were not finalized previously, but it does not delete
-   * any existing finalized feature. The results of the feature updates are written to STDOUT.
-   *
-   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
-   * updates to STDOUT, without applying them.
-   *
-   * @throws UpdateFeaturesException if at least one of the feature updates failed
-   */
-  def upgradeAllFeatures(): Unit = {
-    val metadata = adminClient.describeFeatures.featureMetadata.get
-    val existingFinalizedFeatures = metadata.finalizedFeatures
-    val updates = supportedFeatures.features.asScala.map {
-      case (feature, targetVersionRange) =>
-        val existingVersionRange = existingFinalizedFeatures.get(feature)
-        if (existingVersionRange == null) {
-          val updateStr =
-            addOp +
-            s"\tFeature: $feature" +
-            s"\tExistingFinalizedMaxVersion: -" +
-            s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
-          (feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
-        } else {
-          if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
-            val updateStr =
-              upgradeOp +
-              s"\tFeature: $feature" +
-              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
-              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
-            (feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
-          } else {
-            (feature, Option.empty)
-          }
-        }
-    }.filter {
-      case(_, updateInfo) => updateInfo.isDefined
-    }.map {
-      case(feature, updateInfo) => (feature, updateInfo.get)
-    }.toMap
+  def printReleaseFeatures(release: String): Unit = {
+    println(s"Default feature versions for release $release:")
+  }
 
-    if (updates.nonEmpty) {
-      maybeApplyFeatureUpdates(updates)
+  def handleUpgrade(namespace: Namespace, admin: Admin): Unit = {
+    val featuresToUpgrade = parseFeaturesOrRelease(namespace) match {
+      case Features(featureNames) => parseVersions(featureNames, namespace)
+      case Release(release) => featuresForRelease(release)
+      case Neither() => throw new TerseFailure("Must specify either --release or at least one --feature and --version with upgrade sub-command.")
+      case Both() => throw new TerseFailure("Cannot specify both --release and --feature with upgrade sub-command.")
     }
-  }
 
-  /**
-   * Downgrades existing finalized features to the highest max version levels known to this tool.
-   * The method may delete existing finalized features if they are no longer seen to be supported,
-   * but it does not add a feature that was not finalized previously. The results of the feature
-   * updates are written to STDOUT.
-   *
-   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
-   * updates to STDOUT, without applying them.
-   *
-   * @throws UpdateFeaturesException if at least one of the feature updates failed
-   */
-  def downgradeAllFeatures(): Unit = {
-    val metadata = adminClient.describeFeatures.featureMetadata.get
-    val existingFinalizedFeatures = metadata.finalizedFeatures
-    val supportedFeaturesMap = supportedFeatures.features
-    val updates = existingFinalizedFeatures.asScala.map {
-      case (feature, existingVersionRange) =>
-        val targetVersionRange = supportedFeaturesMap.get(feature)
-        if (targetVersionRange == null) {
-          val updateStr =
-            deleteOp +
-            s"\tFeature: $feature" +
-            s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
-            s"\tNewFinalizedMaxVersion: -"
-          (feature, Some(updateStr, new FeatureUpdate(0, true)))
-        } else {
-          if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
-            val updateStr =
-              downgradeOp +
-              s"\tFeature: $feature" +
-              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
-              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
-            (feature, Some(updateStr, new FeatureUpdate(targetVersionRange.max, true)))
-          } else {
-            (feature, Option.empty)
-          }
-        }
-    }.filter {
-      case(_, updateInfo) => updateInfo.isDefined
-    }.map {
-      case(feature, updateInfo) => (feature, updateInfo.get)
-    }.toMap
+    val dryRun = namespace.getBoolean("dry_run")
+    val updateResult = admin.updateFeatures(featuresToUpgrade.map { case (feature, version) =>
+      feature -> new FeatureUpdate(version, UpgradeType.UPGRADE)
+    }.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
+    handleUpdateFeaturesResponse(updateResult, featuresToUpgrade, dryRun, "upgrade")
+  }
 
-    if (updates.nonEmpty) {
-      maybeApplyFeatureUpdates(updates)
+  def handleDowngrade(namespace: Namespace, admin: Admin): Unit = {
+    val featuresToDowngrade = parseFeaturesOrRelease(namespace) match {
+      case Features(featureNames) => parseVersions(featureNames, namespace)
+      case Neither() => throw new TerseFailure("Must specify at least one --feature and --version with downgrade sub-command.")
+      case _ => throw new IllegalStateException()
     }
-  }
 
-  /**
-   * Applies the provided feature updates. If the --dry-run CLI option is provided, the method
-   * only prints the expected feature updates to STDOUT without applying them.
-   *
-   * @param updates the feature updates to be applied via the admin client
-   *
-   * @throws UpdateFeaturesException if at least one of the feature updates failed
-   */
-  private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
-    if (opts.hasDryRunOption) {
-      println("Expected feature updates:" + ListMap(
-        updates
-          .toSeq
-          .sortBy { case(feature, _) => feature} :_*)
-          .map { case(_, (updateStr, _)) => updateStr}
-          .mkString("\n"))
-    } else {
-      val result = adminClient.updateFeatures(
-        updates
-          .map { case(feature, (_, update)) => (feature, update)}
-          .asJava,
-        new UpdateFeaturesOptions())
-      val resultSortedByFeature = ListMap(
-        result
-          .values
-          .asScala
-          .toSeq
-          .sortBy { case(feature, _) => feature} :_*)
-      val failures = resultSortedByFeature.map {
-        case (feature, updateFuture) =>
-          val (updateStr, _) = updates(feature)
-          try {
-            updateFuture.get
-            println(updateStr + "\tResult: OK")
-            0
-          } catch {
-            case e: ExecutionException =>
-              val cause = if (e.getCause == null) e else e.getCause
-              println(updateStr + "\tResult: FAILED due to " + cause)
-              1
-            case e: Throwable =>
-              println(updateStr + "\tResult: FAILED due to " + e)
-              1
-          }
-      }.sum
-      if (failures > 0) {
-        throw new UpdateFeaturesException(s"$failures feature updates failed!")
+    val dryRun = namespace.getBoolean("dry_run")
+    val unsafe = namespace.getBoolean("unsafe")
+    val updateResult = admin.updateFeatures(featuresToDowngrade.map { case (feature, version) =>
+      if (unsafe) {
+        feature -> new FeatureUpdate(version, UpgradeType.UNSAFE_DOWNGRADE)
+      } else {
+        feature -> new FeatureUpdate(version, UpgradeType.SAFE_DOWNGRADE)
       }
-    }
+    }.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
+
+    handleUpdateFeaturesResponse(updateResult, featuresToDowngrade, dryRun, "downgrade")
   }
 
-  def execute(): Unit = {
-    if (opts.hasDescribeOption) {
-      describeFeatures()
-    } else if (opts.hasUpgradeAllOption) {
-      upgradeAllFeatures()
-    } else if (opts.hasDowngradeAllOption) {
-      downgradeAllFeatures()
-    } else {
-      throw new IllegalStateException("Unexpected state: no CLI command could be executed.")
+  def handleDisable(namespace: Namespace, admin: Admin): Unit = {
+    val featuresToDisable = parseFeaturesOrRelease(namespace) match {
+      case Features(featureNames) => featureNames
+      case Neither() => throw new TerseFailure("Must specify at least one --feature and --version with downgrade sub-command.")
+      case _ => throw new IllegalStateException()
     }
-  }
 
-  def close(): Unit = {
-    adminClient.close()
+    val dryRun = namespace.getBoolean("dry_run")
+    val unsafe = namespace.getBoolean("unsafe")
+    val updateResult = admin.updateFeatures(featuresToDisable.map { feature =>
+      if (unsafe) {
+        feature -> new FeatureUpdate(0.toShort, UpgradeType.UNSAFE_DOWNGRADE)
+      } else {
+        feature -> new FeatureUpdate(0.toShort, UpgradeType.SAFE_DOWNGRADE)
+      }
+    }.toMap.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
+
+    handleUpdateFeaturesResponse(updateResult, featuresToDisable.map {
+      feature => feature -> 0.toShort
+    }.toMap, dryRun, "disable")
   }
-}
 
-class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
-  private val bootstrapServerOpt = parser.accepts(
-      "bootstrap-server",
-      "REQUIRED: A comma-separated list of host:port pairs to use for establishing the connection" +
-      " to the Kafka cluster.")
-      .withRequiredArg
-      .describedAs("server to connect to")
-      .ofType(classOf[String])
-  private val commandConfigOpt = parser.accepts(
-    "command-config",
-    "Property file containing configs to be passed to Admin Client." +
-    " This is used with --bootstrap-server option when required.")
-    .withOptionalArg
-    .describedAs("command config property file")
-    .ofType(classOf[String])
-  private val describeOpt = parser.accepts(
-    "describe",
-    "Describe supported and finalized features from a random broker.")
-  private val upgradeAllOpt = parser.accepts(
-    "upgrade-all",
-    "Upgrades all finalized features to the maximum version levels known to the tool." +
-    " This command finalizes new features known to the tool that were never finalized" +
-    " previously in the cluster, but it is guaranteed to not delete any existing feature.")
-  private val downgradeAllOpt = parser.accepts(
-    "downgrade-all",
-    "Downgrades all finalized features to the maximum version levels known to the tool." +
-    " This command deletes unknown features from the list of finalized features in the" +
-    " cluster, but it is guaranteed to not add a new feature.")
-  private val dryRunOpt = parser.accepts(
-    "dry-run",
-    "Performs a dry-run of upgrade/downgrade mutations to finalized feature without applying them.")
-
-  options = parser.parse(args : _*)
-
-  checkArgs()
-
-  def has(builder: OptionSpec[_]): Boolean = options.has(builder)
-
-  def hasDescribeOption: Boolean = has(describeOpt)
-
-  def hasDryRunOption: Boolean = has(dryRunOpt)
-
-  def hasUpgradeAllOption: Boolean = has(upgradeAllOpt)
-
-  def hasDowngradeAllOption: Boolean = has(downgradeAllOpt)
-
-  def commandConfig: Properties = {
-    if (has(commandConfigOpt))
-      Utils.loadProps(options.valueOf(commandConfigOpt))
-    else
-      new Properties()
+  def handleUpdateFeaturesResponse(updateResult: UpdateFeaturesResult,
+                                   updatedFeatures: Map[String, Short],
+                                   dryRun: Boolean,
+                                   op: String): Unit = {
+    val errors = updateResult.values().asScala.map { case (feature, future) =>
+      try {
+        future.get()
+        feature -> None
+      } catch {
+        case e: ExecutionException => feature -> Some(e.getCause)
+        case t: Throwable => feature -> Some(t)
+      }
+    }
+
+    errors.foreach { case (feature, maybeThrowable) =>
+      if (maybeThrowable.isDefined) {
+        if (dryRun) {
+          System.out.println(s"Can not $op feature '$feature' to ${updatedFeatures(feature)}. ${maybeThrowable.get.getMessage}")
+        } else {
+          System.out.println(s"Could not $op feature '$feature' to ${updatedFeatures(feature)}. ${maybeThrowable.get.getMessage}")
+        }
+      } else {
+        if (dryRun) {
+          System.out.println(s"Feature '$feature' can be ${op}d to ${updatedFeatures(feature)}.")
+        } else {
+          System.out.println(s"Feature '$feature' was ${op}d to ${updatedFeatures(feature)}.")
+        }
+      }
+    }
   }
 
-  def bootstrapServers: String = options.valueOf(bootstrapServerOpt)
+  sealed trait ReleaseOrFeatures { }
+  case class Neither() extends ReleaseOrFeatures
+  case class Release(release: String) extends ReleaseOrFeatures
+  case class Features(featureNames: Seq[String]) extends ReleaseOrFeatures
+  case class Both() extends ReleaseOrFeatures
+
+  def parseFeaturesOrRelease(namespace: Namespace): ReleaseOrFeatures = {
+    val release = namespace.getString("release")
+    val features = namespace.getList[String]("feature").asScala
+
+    if (release != null && features != null) {
+      Both()
+    } else if (release == null && features == null) {
+      Neither()
+    } else if (release != null) {
+      Release(release)
+    } else {
+      Features(features)
+    }
+  }
 
-  def checkArgs(): Unit = {
-    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool describes and updates finalized features.")
-    val numActions = Seq(describeOpt, upgradeAllOpt, downgradeAllOpt).count(has)
-    if (numActions != 1) {
-      CommandLineUtils.printUsageAndDie(
-        parser,
-        "Command must include exactly one action: --describe, --upgrade-all, --downgrade-all.")
+  def parseVersions(features: Seq[String], namespace: Namespace): Map[String, Short] = {
+    val versions = namespace.getList[Short]("version").asScala
+    if (versions == null) {
+      throw new TerseFailure("Must specify --version when using --feature argument(s).")
     }
-    CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
-    if (hasDryRunOption && !hasUpgradeAllOption && !hasDowngradeAllOption) {
-      CommandLineUtils.printUsageAndDie(
-        parser,
-        "Command can contain --dry-run option only when either --upgrade-all or --downgrade-all actions are provided.")
+    if (versions.size != features.size) {
+      if (versions.size > features.size) {
+        throw new TerseFailure("Too many --version arguments given. For each --feature argument there should be one --version argument.")
+      } else {
+        throw new TerseFailure("Too many --feature arguments given. For each --feature argument there should be one --version argument.")
+      }
     }
+    features.zip(versions).map { case (feature, version) =>
+      feature -> version
+    }.toMap
+  }
+
+  def defaultFeatures(): Map[String, Short] = {
+    Map.empty
   }
-}
 
-object FeatureApis {
-  private def createAdminClient(opts: FeatureCommandOptions): Admin = {
-    val props = new Properties()
-    props.putAll(opts.commandConfig)
-    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServers)
-    Admin.create(props)
+  def featuresForRelease(release: String): Map[String, Short] = {
+    Map.empty
   }
 }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 7aeeded87f..c29d937ce0 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -33,12 +33,13 @@ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zk.TopicZNode.TopicIdReplicaAssignment
 import kafka.zk.{FeatureZNodeStatus, _}
 import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
+import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
 import org.apache.kafka.common.ElectionType
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
-import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterPartitionRequestData, AlterPartitionResponseData, UpdateFeaturesRequestData}
+import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterPartitionRequestData, AlterPartitionResponseData}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
@@ -1928,8 +1929,9 @@ class KafkaController(val config: KafkaConfig,
    *
    * @return         the new FinalizedVersionRange or error, as described above.
    */
-  private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = {
-    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequest.FeatureUpdateItem):
+      Either[FinalizedVersionRange, ApiError] = {
+    if (update.isDeleteRequest) {
       throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update")
     }
 
@@ -1941,7 +1943,7 @@ class KafkaController(val config: KafkaConfig,
     } else {
       var newVersionRange: FinalizedVersionRange = null
       try {
-        newVersionRange = new FinalizedVersionRange(supportedVersionRange.min, update.maxVersionLevel)
+        newVersionRange = new FinalizedVersionRange(update.versionLevel(), update.versionLevel())
       } catch {
         case _: IllegalArgumentException => {
           // This exception means the provided maxVersionLevel is invalid. It is handled below
@@ -1951,7 +1953,7 @@ class KafkaController(val config: KafkaConfig,
       if (newVersionRange == null) {
         Right(new ApiError(Errors.INVALID_REQUEST,
           "Could not apply finalized feature update because the provided" +
-          s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+          s" maxVersionLevel:${update.versionLevel} is lower than the" +
           s" supported minVersion:${supportedVersionRange.min}."))
       } else {
         val newFinalizedFeature =
@@ -1985,9 +1987,9 @@ class KafkaController(val config: KafkaConfig,
    * @return                       the new FinalizedVersionRange to be updated into ZK or error
    *                               as described above.
    */
-  private def validateFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey,
+  private def validateFeatureUpdate(update: UpdateFeaturesRequest.FeatureUpdateItem,
                                     existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = {
-    def newVersionRangeOrError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = {
+    def newVersionRangeOrError(update: UpdateFeaturesRequest.FeatureUpdateItem): Either[Option[FinalizedVersionRange], ApiError] = {
       newFinalizedVersionRangeOrIncompatibilityError(update)
         .fold(versionRange => Left(Some(versionRange)), error => Right(error))
     }
@@ -1995,9 +1997,12 @@ class KafkaController(val config: KafkaConfig,
     if (update.feature.isEmpty) {
       // Check that the feature name is not empty.
       Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty."))
+    } else if (update.upgradeType.equals(UpgradeType.UNKNOWN)) {
+      Right(new ApiError(Errors.INVALID_REQUEST, "Received unknown upgrade type."))
     } else {
+
       // We handle deletion requests separately from non-deletion requests.
-      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      if (update.isDeleteRequest) {
         if (existingVersionRange.isEmpty) {
           // Disallow deletion of a non-existing finalized feature.
           Right(new ApiError(Errors.INVALID_REQUEST,
@@ -2005,39 +2010,33 @@ class KafkaController(val config: KafkaConfig,
         } else {
           Left(Option.empty)
         }
-      } else if (update.maxVersionLevel() < 1) {
-        // Disallow deletion of a finalized feature without allowDowngrade flag set.
+      } else if (update.versionLevel() < 1) {
+        // Disallow deletion of a finalized feature without SAFE downgrade type.
         Right(new ApiError(Errors.INVALID_REQUEST,
-                           s"Can not provide maxVersionLevel: ${update.maxVersionLevel} less" +
-                           s" than 1 without setting the allowDowngrade flag to true in the request."))
+                           s"Can not provide maxVersionLevel: ${update.versionLevel} less" +
+                           s" than 1 without setting the SAFE downgradeType in the request."))
       } else {
         existingVersionRange.map(existing =>
-          if (update.maxVersionLevel == existing.max) {
+          if (update.versionLevel == existing.max) {
             // Disallow a case where target maxVersionLevel matches existing maxVersionLevel.
             Right(new ApiError(Errors.INVALID_REQUEST,
-                               s"Can not ${if (update.allowDowngrade) "downgrade" else "upgrade"}" +
+                               s"Can not ${if (update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) "downgrade" else "upgrade"}" +
                                s" a finalized feature from existing maxVersionLevel:${existing.max}" +
                                " to the same value."))
-          } else if (update.maxVersionLevel < existing.max && !update.allowDowngrade) {
-            // Disallow downgrade of a finalized feature without the allowDowngrade flag set.
+          } else if (update.versionLevel < existing.max && !update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) {
+            // Disallow downgrade of a finalized feature without the downgradeType set.
             Right(new ApiError(Errors.INVALID_REQUEST,
                                s"Can not downgrade finalized feature from existing" +
                                s" maxVersionLevel:${existing.max} to provided" +
-                               s" maxVersionLevel:${update.maxVersionLevel} without setting the" +
-                               " allowDowngrade flag in the request."))
-          } else if (update.allowDowngrade && update.maxVersionLevel > existing.max) {
-            // Disallow a request that sets allowDowngrade flag without specifying a
+                               s" maxVersionLevel:${update.versionLevel} without setting the" +
+                               " downgradeType to SAFE in the request."))
+          } else if (!update.upgradeType.equals(UpgradeType.UPGRADE) && update.versionLevel > existing.max) {
+            // Disallow a request that sets downgradeType without specifying a
             // maxVersionLevel that's lower than the existing maxVersionLevel.
             Right(new ApiError(Errors.INVALID_REQUEST,
-                               s"When the allowDowngrade flag set in the request, the provided" +
-                               s" maxVersionLevel:${update.maxVersionLevel} can not be greater than" +
+                               s"When the downgradeType is set to SAFE set in the request, the provided" +
+                               s" maxVersionLevel:${update.versionLevel} can not be greater than" +
                                s" existing maxVersionLevel:${existing.max}."))
-          } else if (update.maxVersionLevel < existing.min) {
-            // Disallow downgrade of a finalized feature below the existing finalized
-            // minVersionLevel.
-            Right(new ApiError(Errors.INVALID_REQUEST,
-                               s"Can not downgrade finalized feature to maxVersionLevel:${update.maxVersionLevel}" +
-                               s" because it's lower than the existing minVersionLevel:${existing.min}."))
           } else {
             newVersionRangeOrError(update)
           }
@@ -2057,7 +2056,7 @@ class KafkaController(val config: KafkaConfig,
 
   private def processFeatureUpdatesWithActiveController(request: UpdateFeaturesRequest,
                                                         callback: UpdateFeaturesCallback): Unit = {
-    val updates = request.data.featureUpdates
+    val updates = request.featureUpdates
     val existingFeatures = featureCache.get
       .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
       .getOrElse(Map[String, FinalizedVersionRange]())
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index dd84f9e73e..4f0fe37989 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -19,8 +19,8 @@ package kafka.server
 
 import kafka.utils.Logging
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
-import org.apache.kafka.common.feature.Features._
 
+import java.util
 import scala.jdk.CollectionConverters._
 
 /**
@@ -32,7 +32,9 @@ import scala.jdk.CollectionConverters._
 class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange]) {
   // For testing only.
   def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
-    supportedFeatures = newFeatures
+    val combined = new util.HashMap[String, SupportedVersionRange](supportedFeatures.features())
+    combined.putAll(newFeatures.features())
+    supportedFeatures = Features.supportedFeatures(combined)
   }
 
   /**
@@ -43,7 +45,7 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
     Features.finalizedFeatures(
       supportedFeatures.features.asScala.map {
         case(name, versionRange) => (
-          name, new FinalizedVersionRange(versionRange.min, versionRange.max))
+          name, new FinalizedVersionRange(versionRange.max, versionRange.max))
       }.asJava)
   }
 
@@ -70,9 +72,7 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
 object BrokerFeatures extends Logging {
 
   def createDefault(): BrokerFeatures = {
-    // The arguments are currently empty, but, in the future as we define features we should
-    // populate the required values here.
-    new BrokerFeatures(emptySupportedFeatures)
+    new BrokerFeatures(Features.emptySupportedFeatures())
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 05aa0f954c..a5166e5e66 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -33,6 +33,7 @@ import kafka.security.CredentialProvider
 import kafka.server.KafkaRaftServer.ControllerRole
 import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder}
 import kafka.utils.{CoreUtils, KafkaScheduler}
+import org.apache.kafka.common.feature.SupportedVersionRange
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
 import org.apache.kafka.common.metrics.Metrics
@@ -80,8 +81,7 @@ class BrokerServer(
   val metrics: Metrics,
   val threadNamePrefix: Option[String],
   val initialOfflineDirs: Seq[String],
-  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
-  val supportedFeatures: util.Map[String, VersionRange]
+  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
 ) extends KafkaBroker {
 
   override def brokerState: BrokerState = lifecycleManager.state
@@ -141,10 +141,6 @@ class BrokerServer(
 
   @volatile var brokerTopicStats: BrokerTopicStats = null
 
-  val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
-
-  val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
-
   val clusterId: String = metaProps.clusterId
 
   var metadataSnapshotter: Option[BrokerMetadataSnapshotter] = None
@@ -153,6 +149,8 @@ class BrokerServer(
 
   var metadataPublisher: BrokerMetadataPublisher = null
 
+  val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
+
   def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
 
   private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
@@ -223,6 +221,8 @@ class BrokerServer(
       clientToControllerChannelManager.start()
       forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager)
 
+      val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
+
       val apiVersionManager = ApiVersionManager(
         ListenerType.BROKER,
         config,
@@ -332,10 +332,16 @@ class BrokerServer(
           setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
           setSecurityProtocol(ep.securityProtocol.id))
       }
+
+      val featuresRemapped = brokerFeatures.supportedFeatures.features().asScala.map {
+        case (k: String, v: SupportedVersionRange) =>
+          k -> VersionRange.of(v.min, v.max)
+      }.asJava
+
       lifecycleManager.start(() => metadataListener.highestMetadataOffset,
         BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
           "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
-        metaProps.clusterId, networkListeners, supportedFeatures)
+        metaProps.clusterId, networkListeners, featuresRemapped)
 
       // Register a listener with the Raft layer to receive metadata event notifications
       raftManager.register(metadataListener)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 8b006dc025..e4b40bc603 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -22,7 +22,6 @@ import java.util.Collections
 import java.util.Map.Entry
 import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
 import java.util.concurrent.{CompletableFuture, ExecutionException}
-
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
@@ -48,7 +47,7 @@ import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, Uuid}
 import org.apache.kafka.controller.Controller
-import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply, VersionRange}
+import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 
@@ -62,7 +61,6 @@ class ControllerApis(val requestChannel: RequestChannel,
                      val authorizer: Option[Authorizer],
                      val quotas: QuotaManagers,
                      val time: Time,
-                     val supportedFeatures: Map[String, VersionRange],
                      val controller: Controller,
                      val raftManager: RaftManager[ApiMessageAndVersion],
                      val config: KafkaConfig,
@@ -108,6 +106,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
         case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
         case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
+        case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
         case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
       }
     } catch {
@@ -784,4 +783,18 @@ class ControllerApis(val requestChannel: RequestChannel,
         }
       })
   }
+
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+    val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
+    authHelper.authorizeClusterOperation(request, ALTER)
+    controller.updateFeatures(updateFeaturesRequest.data)
+      .whenComplete((response, exception) => {
+        if (exception != null) {
+          requestHelper.handleError(request, exception)
+        } else {
+          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+            new UpdateFeaturesResponse(response.setThrottleTimeMs(requestThrottleMs)))
+        }
+      })
+  }
 }
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 072a7721f4..049ad5b8c2 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -37,8 +37,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Endpoint}
-import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics}
-import org.apache.kafka.metadata.{KafkaConfigSchema, VersionRange}
+import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
+import org.apache.kafka.metadata.KafkaConfigSchema
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.raft.RaftConfig.AddressSpec
 import org.apache.kafka.server.authorizer.Authorizer
@@ -79,7 +79,6 @@ class ControllerServer(
   var createTopicPolicy: Option[CreateTopicPolicy] = None
   var alterConfigPolicy: Option[AlterConfigPolicy] = None
   var controller: Controller = null
-  val supportedFeatures: Map[String, VersionRange] = Map()
   var quotaManagers: QuotaManagers = null
   var controllerApis: ControllerApis = null
   var controllerApisHandlerPool: KafkaRequestHandlerPool = null
@@ -161,6 +160,8 @@ class ControllerServer(
       alterConfigPolicy = Option(config.
         getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
 
+      val quorumFeatures = QuorumFeatures.create(config.nodeId, QuorumFeatures.defaultFeatureMap())
+
       val controllerBuilder = {
         val leaderImbalanceCheckIntervalNs = if (config.autoLeaderRebalanceEnable) {
           OptionalLong.of(TimeUnit.NANOSECONDS.convert(config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS))
@@ -173,6 +174,7 @@ class ControllerServer(
           setThreadNamePrefix(threadNamePrefixAsString).
           setConfigSchema(configSchema).
           setRaftClient(raftManager.client).
+          setQuorumFeatures(quorumFeatures).
           setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
           setDefaultNumPartitions(config.numPartitions.intValue()).
           setIsLeaderRecoverySupported(config.interBrokerProtocolVersion >= KAFKA_3_2_IV0).
@@ -198,7 +200,6 @@ class ControllerServer(
         authorizer,
         quotaManagers,
         time,
-        supportedFeatures,
         controller,
         raftManager,
         config,
diff --git a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
index 88addb76c4..ee7337653c 100644
--- a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
+++ b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
@@ -138,10 +138,10 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
     val newFeatures = new util.HashMap[String, FinalizedVersionRange]()
     newFeatures.putAll(features.features.features())
     featuresDelta.changes().entrySet().forEach { e =>
-      e.getValue().asScala match {
+      e.getValue.asScala match {
         case None => newFeatures.remove(e.getKey)
-        case Some(feature) => newFeatures.put(e.getKey,
-          new FinalizedVersionRange(feature.min(), feature.max()))
+        case Some(version) => newFeatures.put(e.getKey,
+          new FinalizedVersionRange(version, version))
       }
     }
     featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(Features.finalizedFeatures(
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index d9629376fd..a0dd19559c 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -86,8 +86,7 @@ class KafkaRaftServer(
       metrics,
       threadNamePrefix,
       offlineDirs,
-      controllerQuorumVotersFuture,
-      Server.SUPPORTED_FEATURES
+      controllerQuorumVotersFuture
     ))
   } else {
     None
diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala
index c395df47e6..5d902c5831 100644
--- a/core/src/main/scala/kafka/server/Server.scala
+++ b/core/src/main/scala/kafka/server/Server.scala
@@ -16,15 +16,12 @@
  */
 package kafka.server
 
-import java.util.Collections
 import java.util.concurrent.TimeUnit
 
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.metadata.VersionRange
 
-import scala.jdk.CollectionConverters._
 
 trait Server {
   def startup(): Unit
@@ -99,7 +96,4 @@ object Server {
   case object STARTING extends ProcessStatus
   case object STARTED extends ProcessStatus
   case object SHUTTING_DOWN extends ProcessStatus
-
-  val SUPPORTED_FEATURES = Collections.
-    unmodifiableMap[String, VersionRange](Map[String, VersionRange]().asJava)
 }
diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java
index b4bfb0dde6..d3bda07361 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -41,6 +41,8 @@ import org.apache.kafka.common.message.ElectLeadersRequestData;
 import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.UpdateFeaturesRequestData;
+import org.apache.kafka.common.message.UpdateFeaturesResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -49,7 +51,7 @@ import org.apache.kafka.controller.Controller;
 import org.apache.kafka.controller.ResultOrError;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
-import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 
@@ -246,7 +248,7 @@ public class MockController implements Controller {
     }
 
     @Override
-    public CompletableFuture<FeatureMapAndEpoch> finalizedFeatures() {
+    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures() {
         throw new UnsupportedOperationException();
     }
 
@@ -349,6 +351,11 @@ public class MockController implements Controller {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(UpdateFeaturesRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
             createPartitions(long deadlineNs, List<CreatePartitionsTopic> topicList) {
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index cbfe4ff34e..2263b09116 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -24,7 +24,6 @@ import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
 import kafka.server.KafkaRaftServer;
 import kafka.server.MetaProperties;
-import kafka.server.Server;
 import kafka.tools.StorageTool;
 import kafka.utils.Logging;
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -238,8 +237,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         new Metrics(),
                         Option.apply(threadNamePrefix),
                         JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(),
-                        connectFutureManager.future,
-                        Server.SUPPORTED_FEATURES()
+                        connectFutureManager.future
                     );
                     brokers.put(node.id(), broker);
                     raftManagers.put(node.id(), raftManager);
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index c329805e2c..9f00c0564c 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -85,8 +85,7 @@ class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndV
       metrics = new Metrics(),
       threadNamePrefix = Some("Broker%02d_".format(config.nodeId)),
       initialOfflineDirs = Seq(),
-      controllerQuorumVotersFuture = controllerQuorumVotersFuture,
-      supportedFeatures = Collections.emptyMap())
+      controllerQuorumVotersFuture = controllerQuorumVotersFuture)
     if (startup) broker.startup()
     broker
   }
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
index 93c22eb7a0..d757148182 100644
--- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils
 
 import java.util.Properties
 
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, assertThrows}
+import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Test
 
 class FeatureCommandTest extends BaseRequestTest {
@@ -75,167 +75,37 @@ class FeatureCommandTest extends BaseRequestTest {
   @Test
   def testDescribeFeaturesSuccess(): Unit = {
     updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
-    val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--describe")))
-    featureApis.setSupportedFeatures(defaultSupportedFeatures)
-    try {
-      val initialDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
-      val expectedInitialDescribeOutput =
-        "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n" +
-        "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n"
-      assertEquals(expectedInitialDescribeOutput, initialDescribeOutput)
-      featureApis.upgradeAllFeatures()
-      val finalDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
-      val expectedFinalDescribeOutput =
-        "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 3\tEpoch: 1\n" +
-        "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 5\tEpoch: 1\n"
-      assertEquals(expectedFinalDescribeOutput, finalDescribeOutput)
-    } finally {
-      featureApis.close()
-    }
-  }
 
-  /**
-   * Tests if the FeatureApis#upgradeAllFeatures API works as expected during a success case.
-   */
-  @Test
-  def testUpgradeAllFeaturesSuccess(): Unit = {
-    val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--upgrade-all"))
-    val featureApis = new FeatureApis(upgradeOpts)
-    try {
-      // Step (1):
-      // - Update the supported features across all brokers.
-      // - Upgrade non-existing feature_1 to maxVersionLevel: 2.
-      // - Verify results.
-      val initialSupportedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
-      updateSupportedFeaturesInAllBrokers(initialSupportedFeatures)
-      featureApis.setSupportedFeatures(initialSupportedFeatures)
-      var output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
-      var expected =
-        "      [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 2\tResult: OK\n"
-      assertEquals(expected, output)
+    val initialDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
+    val expectedInitialDescribeOutputs = Seq(
+      "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: -",
+      "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: -"
+    )
 
-      // Step (2):
-      // - Update the supported features across all brokers.
-      // - Upgrade existing feature_1 to maxVersionLevel: 3.
-      // - Upgrade non-existing feature_2 to maxVersionLevel: 5.
-      // - Verify results.
-      updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
-      featureApis.setSupportedFeatures(defaultSupportedFeatures)
-      output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
-      expected =
-        "  [Upgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: 3\tResult: OK\n" +
-        "      [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 5\tResult: OK\n"
-      assertEquals(expected, output)
-
-      // Step (3):
-      // - Perform an upgrade of all features again.
-      // - Since supported features have not changed, expect that the above action does not yield
-      //   any results.
-      output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
-      assertTrue(output.isEmpty)
-      featureApis.setOptions(upgradeOpts)
-      output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
-      assertTrue(output.isEmpty)
-    } finally {
-      featureApis.close()
+    expectedInitialDescribeOutputs.foreach { expectedOutput =>
+      assertTrue(initialDescribeOutput.contains(expectedOutput))
     }
-  }
-
-  /**
-   * Tests if the FeatureApis#downgradeAllFeatures API works as expected during a success case.
-   */
-  @Test
-  def testDowngradeFeaturesSuccess(): Unit = {
-    val downgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--downgrade-all"))
-    val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--upgrade-all"))
-    val featureApis = new FeatureApis(upgradeOpts)
-    try {
-      // Step (1):
-      // - Update the supported features across all brokers.
-      // - Upgrade non-existing feature_1 to maxVersionLevel: 3.
-      // - Upgrade non-existing feature_2 to maxVersionLevel: 5.
-      updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
-      featureApis.setSupportedFeatures(defaultSupportedFeatures)
-      featureApis.upgradeAllFeatures()
-
-      // Step (2):
-      // - Downgrade existing feature_1 to maxVersionLevel: 2.
-      // - Delete feature_2 since it is no longer supported by the FeatureApis object.
-      // - Verify results.
-      val downgradedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
-      featureApis.setSupportedFeatures(downgradedFeatures)
-      featureApis.setOptions(downgradeOpts)
-      var output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
-      var expected =
-        "[Downgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 3\tNewFinalizedMaxVersion: 2\tResult: OK\n" +
-        "   [Delete]\tFeature: feature_2\tExistingFinalizedMaxVersion: 5\tNewFinalizedMaxVersion: -\tResult: OK\n"
-      assertEquals(expected, output)
 
-      // Step (3):
-      // - Perform a downgrade of all features again.
-      // - Since supported features have not changed, expect that the above action does not yield
-      //   any results.
-      updateSupportedFeaturesInAllBrokers(downgradedFeatures)
-      output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
-      assertTrue(output.isEmpty)
-
-      // Step (4):
-      // - Delete feature_1 since it is no longer supported by the FeatureApis object.
-      // - Verify results.
-      featureApis.setSupportedFeatures(Features.emptySupportedFeatures())
-      output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
-      expected =
-        "   [Delete]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: -\tResult: OK\n"
-      assertEquals(expected, output)
-    } finally {
-      featureApis.close()
+    FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "upgrade",
+      "--feature", "feature_1", "--version", "3", "--feature", "feature_2", "--version", "5"))
+    val upgradeDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
+    val expectedUpgradeDescribeOutput = Seq(
+      "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: 3",
+      "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: 5"
+    )
+    expectedUpgradeDescribeOutput.foreach { expectedOutput =>
+      assertTrue(upgradeDescribeOutput.contains(expectedOutput))
     }
-  }
-
-  /**
-   * Tests if the FeatureApis#upgradeAllFeatures API works as expected during a partial failure case.
-   */
-  @Test
-  def testUpgradeFeaturesFailure(): Unit = {
-    val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", bootstrapServers(), "--upgrade-all"))
-    val featureApis = new FeatureApis(upgradeOpts)
-    try {
-      // Step (1): Update the supported features across all brokers.
-      updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
 
-      // Step (2):
-      // - Intentionally setup the FeatureApis object such that it contains incompatible target
-      //   features (viz. feature_2 and feature_3).
-      // - Upgrade non-existing feature_1 to maxVersionLevel: 4. Expect the operation to fail with
-      //   an incompatibility failure.
-      // - Upgrade non-existing feature_2 to maxVersionLevel: 5. Expect the operation to succeed.
-      // - Upgrade non-existing feature_3 to maxVersionLevel: 3. Expect the operation to fail
-      //   since the feature is not supported.
-      val targetFeaturesWithIncompatibilities =
-        Features.supportedFeatures(
-          Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 4)),
-                      Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5)),
-                      Utils.mkEntry("feature_3", new SupportedVersionRange(1, 3))))
-      featureApis.setSupportedFeatures(targetFeaturesWithIncompatibilities)
-      val output = TestUtils.grabConsoleOutput({
-        val exception = assertThrows(classOf[UpdateFeaturesException], () => featureApis.upgradeAllFeatures())
-        assertEquals("2 feature updates failed!", exception.getMessage)
-      })
-      val expected =
-        "      [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -" +
-        "\tNewFinalizedMaxVersion: 4\tResult: FAILED due to" +
-        " org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
-        " feature update because brokers were found to have incompatible versions for the" +
-        " feature.\n" +
-        "      [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -" +
-        "\tNewFinalizedMaxVersion: 5\tResult: OK\n" +
-        "      [Add]\tFeature: feature_3\tExistingFinalizedMaxVersion: -" +
-        "\tNewFinalizedMaxVersion: 3\tResult: FAILED due to" +
-        " org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
-        " feature update because the provided feature is not supported.\n"
-      assertEquals(expected, output)
-    } finally {
-      featureApis.close()
+    FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "downgrade",
+      "--feature", "feature_1", "--version", "2", "--feature", "feature_2", "--version", "2"))
+    val downgradeDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
+    val expectedFinalDescribeOutput = Seq(
+      "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: 2",
+      "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: 2"
+    )
+    expectedFinalDescribeOutput.foreach { expectedOutput =>
+      assertTrue(downgradeDescribeOutput.contains(expectedOutput))
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
index c4cc52c27c..10d69e2cd6 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{Disabled, Test}
 
 import scala.jdk.CollectionConverters._
 
@@ -89,6 +89,7 @@ class BrokerFeaturesTest {
   }
 
   @Test
+  @Disabled("Need to remove or rewrite this test after we fully remove FinalizedVersionRange")
   def testDefaultFinalizedFeatures(): Unit = {
     val brokerFeatures = BrokerFeatures.createDefault()
     val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 65580dc2be..5fcf763d7f 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -101,7 +101,6 @@ class ControllerApisTest {
       authorizer,
       quotas,
       time,
-      Map.empty,
       controller,
       raftManager,
       new KafkaConfig(props),
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 43ba5bae48..0b38a02a17 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -763,7 +763,7 @@ class RequestQuotaTest extends BaseRequestTest {
 
 object RequestQuotaTest {
   val ClusterActions = ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet
-  val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS)
+  val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES)
   val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
   val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions
 
diff --git a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
index 92ba0425dc..a7085850b2 100644
--- a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
@@ -213,8 +213,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
     val targetMaxVersionLevel = (defaultFinalizedFeatures().get("feature_1").max() - 1).asInstanceOf[Short]
     testWithInvalidFeatureUpdate[InvalidRequestException](
       "feature_1",
-      new FeatureUpdate(targetMaxVersionLevel,false),
-      ".*Can not downgrade finalized feature.*allowDowngrade.*".r)
+      new FeatureUpdate(targetMaxVersionLevel, FeatureUpdate.UpgradeType.UPGRADE),
+      ".*Can not downgrade finalized feature.*".r)
   }
 
   /**
@@ -226,8 +226,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
     val targetMaxVersionLevel = (defaultFinalizedFeatures().get("feature_1").max() + 1).asInstanceOf[Short]
     testWithInvalidFeatureUpdate[InvalidRequestException](
       "feature_1",
-      new FeatureUpdate(targetMaxVersionLevel, true),
-      ".*When the allowDowngrade flag set in the request, the provided maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
+      new FeatureUpdate(targetMaxVersionLevel,  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+      ".*When the downgradeType is set to SAFE set in the request, the provided maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
   }
 
   /**
@@ -264,7 +264,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
     assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode))
     assertNotNull(result.errorMessage)
     assertFalse(result.errorMessage.isEmpty)
-    val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than 1.*allowDowngrade.*".r
+    val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than 1.*".r
     assertTrue(exceptionMsgPattern.findFirstIn(result.errorMessage).isDefined, result.errorMessage)
     checkFeatures(
       adminClient,
@@ -282,7 +282,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
   def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = {
     testWithInvalidFeatureUpdate[InvalidRequestException](
       "feature_non_existing",
-      new FeatureUpdate(3, true),
+      new FeatureUpdate(3.toShort,  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
       ".*Could not apply finalized feature update because the provided feature is not supported.*".r)
   }
 
@@ -295,7 +295,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
     val targetMaxVersionLevel = defaultFinalizedFeatures().get("feature_1").max()
     testWithInvalidFeatureUpdate[InvalidRequestException](
       "feature_1",
-      new FeatureUpdate(targetMaxVersionLevel, false),
+      new FeatureUpdate(targetMaxVersionLevel,  FeatureUpdate.UpgradeType.UPGRADE),
       ".*Can not upgrade a finalized feature.*to the same value.*".r)
   }
 
@@ -331,7 +331,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
     ).getOrElse(Features.emptyFinalizedFeatures())
     val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
 
-    val invalidUpdate = new FeatureUpdate(supportedVersionRange.max(), false)
+    val invalidUpdate = new FeatureUpdate(supportedVersionRange.max(),  FeatureUpdate.UpgradeType.UPGRADE)
     val nodeBefore = getFeatureZNode()
     val adminClient = createAdminClient()
     val result = adminClient.updateFeatures(
@@ -393,10 +393,10 @@ class UpdateFeaturesTest extends BaseRequestTest {
 
     val targetFinalizedFeatures = Features.finalizedFeatures(
       Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
-    val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
-    val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), false)
+        Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
+        Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
+    val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(),  FeatureUpdate.UpgradeType.UPGRADE)
+    val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(),  FeatureUpdate.UpgradeType.UPGRADE)
 
     val adminClient = createAdminClient()
     adminClient.updateFeatures(
@@ -427,8 +427,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
     updateSupportedFeaturesInAllBrokers(supportedFeatures)
     val initialFinalizedFeatures = Features.finalizedFeatures(
       Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 4))))
+        Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
+        Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
     val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
 
     // Below we aim to do the following:
@@ -436,10 +436,10 @@ class UpdateFeaturesTest extends BaseRequestTest {
     // - Valid downgrade of feature_2 maxVersionLevel from 4 to 3
     val targetFinalizedFeatures = Features.finalizedFeatures(
       Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
-    val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
-    val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), true)
+        Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
+        Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
+    val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(),  FeatureUpdate.UpgradeType.UPGRADE)
+    val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(),  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
 
     val adminClient = createAdminClient()
     adminClient.updateFeatures(
@@ -471,8 +471,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
     updateSupportedFeaturesInAllBrokers(supportedFeatures)
     val initialFinalizedFeatures = Features.finalizedFeatures(
       Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 4))))
+        Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
+        Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
     val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
 
     // Below we aim to do the following:
@@ -481,10 +481,10 @@ class UpdateFeaturesTest extends BaseRequestTest {
     //   (because we intentionally do not set the allowDowngrade flag)
     val targetFinalizedFeatures = Features.finalizedFeatures(
       Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
-    val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
-    val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), false)
+        Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
+        Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
+    val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(),  FeatureUpdate.UpgradeType.UPGRADE)
+    val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(),  FeatureUpdate.UpgradeType.UPGRADE)
 
     val adminClient = createAdminClient()
     val result = adminClient.updateFeatures(
@@ -495,7 +495,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
     result.values().get("feature_1").get()
     // Expect update for "feature_2" to have failed.
     checkException[InvalidRequestException](
-      result, Map("feature_2" -> ".*Can not downgrade finalized feature.*allowDowngrade.*".r))
+      result, Map("feature_2" -> ".*Can not downgrade finalized feature.*".r))
     val expectedFeatures = Features.finalizedFeatures(
       Utils.mkMap(
         Utils.mkEntry("feature_1", targetFinalizedFeatures.get("feature_1")),
@@ -539,8 +539,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
 
     val initialFinalizedFeatures = Features.finalizedFeatures(
       Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 4))))
+        Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
+        Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
     val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
 
     // Below we aim to do the following:
@@ -549,10 +549,10 @@ class UpdateFeaturesTest extends BaseRequestTest {
     // - Valid downgrade of feature_2 maxVersionLevel from 4 to 3
     val targetFinalizedFeatures = Features.finalizedFeatures(
       Utils.mkMap(
-        Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 3)),
-        Utils.mkEntry("feature_2", new FinalizedVersionRange(2, 3))))
-    val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), false)
-    val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), true)
+        Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
+        Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
+    val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(),  FeatureUpdate.UpgradeType.UPGRADE)
+    val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(),  FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
 
     val adminClient = createAdminClient()
     val result = adminClient.updateFeatures(
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index fb3844f23b..29b41c797b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
-import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -178,6 +178,13 @@ public class ClusterControlManager {
         return brokerRegistrations;
     }
 
+    Map<Integer, Map<String, VersionRange>> brokerSupportedVersions() {
+        return brokerRegistrations()
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().supportedFeatures()));
+    }
+
     Set<Integer> fencedBrokerIds() {
         return brokerRegistrations.values()
             .stream()
@@ -192,7 +199,7 @@ public class ClusterControlManager {
     public ControllerResult<BrokerRegistrationReply> registerBroker(
             BrokerRegistrationRequestData request,
             long brokerEpoch,
-            FeatureMapAndEpoch finalizedFeatures) {
+            FinalizedControllerFeatures finalizedFeatures) {
         if (heartbeatManager == null) {
             throw new RuntimeException("ClusterControlManager is not active.");
         }
@@ -229,13 +236,14 @@ public class ClusterControlManager {
                 setSecurityProtocol(listener.securityProtocol()));
         }
         for (BrokerRegistrationRequestData.Feature feature : request.features()) {
-            Optional<VersionRange> finalized = finalizedFeatures.map().get(feature.name());
+            Optional<Short> finalized = finalizedFeatures.get(feature.name());
             if (finalized.isPresent()) {
-                if (!finalized.get().contains(new VersionRange(feature.minSupportedVersion(),
-                        feature.maxSupportedVersion()))) {
+                if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized.get())) {
                     throw new UnsupportedVersionException("Unable to register because " +
-                        "the broker has an unsupported version of " + feature.name());
+                            "the broker has an unsupported version of " + feature.name());
                 }
+            } else {
+                log.warn("Broker registered with feature {} that is unknown to the controller", feature.name());
             }
             record.features().add(new BrokerFeature().
                 setName(feature.name()).
@@ -265,10 +273,10 @@ public class ClusterControlManager {
         }
         Map<String, VersionRange> features = new HashMap<>();
         for (BrokerFeature feature : record.features()) {
-            features.put(feature.name(), new VersionRange(
+            features.put(feature.name(), VersionRange.of(
                 feature.minSupportedVersion(), feature.maxSupportedVersion()));
         }
-       
+
         // Update broker registrations.
         BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
                 new BrokerRegistration(brokerId, record.brokerEpoch(),
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index c5fdefffbf..6f99765424 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -36,12 +36,14 @@ import org.apache.kafka.common.message.ElectLeadersRequestData;
 import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.UpdateFeaturesRequestData;
+import org.apache.kafka.common.message.UpdateFeaturesResponseData;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
-import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.authorizer.AclMutator;
 
 import java.util.Collection;
@@ -152,7 +154,7 @@ public interface Controller extends AclMutator, AutoCloseable {
      *
      * @return              A future yielding the feature ranges.
      */
-    CompletableFuture<FeatureMapAndEpoch> finalizedFeatures();
+    CompletableFuture<FinalizedControllerFeatures> finalizedFeatures();
 
     /**
      * Perform some incremental configuration changes.
@@ -247,6 +249,15 @@ public interface Controller extends AclMutator, AutoCloseable {
         AllocateProducerIdsRequestData request
     );
 
+    /**
+     * Update a set of feature flags
+     * @param request   The update features request
+     * @return          A future which yields the result of the action
+     */
+    CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
+        UpdateFeaturesRequestData request
+    );
+
     /**
      * Begin writing a controller snapshot.  If there was already an ongoing snapshot, it
      * simply returns information about that snapshot rather than starting a new one.
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index ed7c98cbb6..307a0ce09d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -25,109 +25,140 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.metadata.FeatureMap;
-import org.apache.kafka.metadata.FeatureMapAndEpoch;
 import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
 
 import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
 
 
 public class FeatureControlManager {
+    private final Logger log;
+
     /**
      * An immutable map containing the features supported by this controller's software.
      */
-    private final Map<String, VersionRange> supportedFeatures;
+    private final QuorumFeatures quorumFeatures;
 
     /**
      * Maps feature names to finalized version ranges.
      */
-    private final TimelineHashMap<String, VersionRange> finalizedVersions;
+    private final TimelineHashMap<String, Short> finalizedVersions;
+
 
-    FeatureControlManager(Map<String, VersionRange> supportedFeatures,
+    FeatureControlManager(LogContext logContext,
+                          QuorumFeatures quorumFeatures,
                           SnapshotRegistry snapshotRegistry) {
-        this.supportedFeatures = supportedFeatures;
+        this.log = logContext.logger(FeatureControlManager.class);
+        this.quorumFeatures = quorumFeatures;
         this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
     }
 
     ControllerResult<Map<String, ApiError>> updateFeatures(
-            Map<String, VersionRange> updates, Set<String> downgradeables,
-            Map<Integer, Map<String, VersionRange>> brokerFeatures) {
+            Map<String, Short> updates,
+            Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
+            Map<Integer, Map<String, VersionRange>> brokerFeatures,
+            boolean validateOnly) {
         TreeMap<String, ApiError> results = new TreeMap<>();
         List<ApiMessageAndVersion> records = new ArrayList<>();
-        for (Entry<String, VersionRange> entry : updates.entrySet()) {
+        for (Entry<String, Short> entry : updates.entrySet()) {
             results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
-                downgradeables.contains(entry.getKey()), brokerFeatures, records));
+                upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), brokerFeatures, records));
+        }
+
+        if (validateOnly) {
+            return ControllerResult.of(Collections.emptyList(), results);
+        } else {
+            return ControllerResult.atomicOf(records, results);
         }
+    }
+
+    boolean canSupportVersion(String featureName, short versionRange) {
+        return quorumFeatures.localSupportedFeature(featureName)
+            .filter(localRange -> localRange.contains(versionRange))
+            .isPresent();
+    }
 
-        return ControllerResult.atomicOf(records, results);
+    boolean featureExists(String featureName) {
+        return quorumFeatures.localSupportedFeature(featureName).isPresent();
     }
 
     private ApiError updateFeature(String featureName,
-                                   VersionRange newRange,
-                                   boolean downgradeable,
-                                   Map<Integer, Map<String, VersionRange>> brokerFeatures,
+                                   short newVersion,
+                                   FeatureUpdate.UpgradeType upgradeType,
+                                   Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
                                    List<ApiMessageAndVersion> records) {
-        if (newRange.min() <= 0) {
+        if (!featureExists(featureName)) {
+            return new ApiError(Errors.INVALID_UPDATE_VERSION,
+                    "The controller does not support the given feature.");
+        }
+
+        if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
             return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The lower value for the new range cannot be less than 1.");
+                    "The controller does not support the given upgrade type.");
         }
-        if (newRange.max() <= 0) {
+
+        final Short currentVersion = finalizedVersions.get(featureName);
+
+        if (newVersion <= 0) {
             return new ApiError(Errors.INVALID_UPDATE_VERSION,
                 "The upper value for the new range cannot be less than 1.");
         }
-        VersionRange localRange = supportedFeatures.get(featureName);
-        if (localRange == null || !localRange.contains(newRange)) {
+
+        if (!canSupportVersion(featureName, newVersion)) {
             return new ApiError(Errors.INVALID_UPDATE_VERSION,
                 "The controller does not support the given feature range.");
         }
-        for (Entry<Integer, Map<String, VersionRange>> brokerEntry :
-            brokerFeatures.entrySet()) {
+
+        for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) {
             VersionRange brokerRange = brokerEntry.getValue().get(featureName);
-            if (brokerRange == null || !brokerRange.contains(newRange)) {
+            if (brokerRange == null || !brokerRange.contains(newVersion)) {
                 return new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "Broker " + brokerEntry.getKey() + " does not support the given " +
                         "feature range.");
             }
         }
-        VersionRange currentRange = finalizedVersions.get(featureName);
-        if (currentRange != null && currentRange.max() > newRange.max()) {
-            if (!downgradeable) {
+
+        if (currentVersion != null && newVersion < currentVersion) {
+            if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
                 return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature without " +
-                    "setting downgradable to true.");
+                    "Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade.");
             }
         }
+
         records.add(new ApiMessageAndVersion(
-            new FeatureLevelRecord().setName(featureName).
-                setMinFeatureLevel(newRange.min()).setMaxFeatureLevel(newRange.max()),
+            new FeatureLevelRecord()
+                .setName(featureName)
+                .setFeatureLevel(newVersion),
             FEATURE_LEVEL_RECORD.highestSupportedVersion()));
         return ApiError.NONE;
     }
 
-    FeatureMapAndEpoch finalizedFeatures(long lastCommittedOffset) {
-        Map<String, VersionRange> features = new HashMap<>();
-        for (Entry<String, VersionRange> entry : finalizedVersions.entrySet(lastCommittedOffset)) {
+    FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) {
+        Map<String, Short> features = new HashMap<>();
+        for (Entry<String, Short> entry : finalizedVersions.entrySet(lastCommittedOffset)) {
             features.put(entry.getKey(), entry.getValue());
         }
-        return new FeatureMapAndEpoch(new FeatureMap(features), lastCommittedOffset);
+        return new FinalizedControllerFeatures(features, lastCommittedOffset);
     }
 
     public void replay(FeatureLevelRecord record) {
-        finalizedVersions.put(record.name(),
-            new VersionRange(record.minFeatureLevel(), record.maxFeatureLevel()));
+        log.info("Setting feature {} to {}", record.name(), record.featureLevel());
+        finalizedVersions.put(record.name(), record.featureLevel());
     }
 
     class FeatureControlIterator implements Iterator<List<ApiMessageAndVersion>> {
-        private final Iterator<Entry<String, VersionRange>> iterator;
+        private final Iterator<Entry<String, Short>> iterator;
 
         FeatureControlIterator(long epoch) {
             this.iterator = finalizedVersions.entrySet(epoch).iterator();
@@ -141,12 +172,10 @@ public class FeatureControlManager {
         @Override
         public List<ApiMessageAndVersion> next() {
             if (!hasNext()) throw new NoSuchElementException();
-            Entry<String, VersionRange> entry = iterator.next();
-            VersionRange versions = entry.getValue();
-            return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord().
-                setName(entry.getKey()).
-                setMinFeatureLevel(versions.min()).
-                setMaxFeatureLevel(versions.max()), FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            Entry<String, Short> entry = iterator.next();
+            return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
+                .setName(entry.getKey())
+                .setFeatureLevel(entry.getValue()), FEATURE_LEVEL_RECORD.highestSupportedVersion()));
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
index d6491e2c14..178ef46bdb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -23,6 +23,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.ProducerIdsBlock;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineLong;
+import org.apache.kafka.timeline.TimelineObject;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -33,17 +34,19 @@ import java.util.List;
 public class ProducerIdControlManager {
 
     private final ClusterControlManager clusterControlManager;
-    private final TimelineLong nextProducerId; // Initializes to 0
+    private final TimelineObject<ProducerIdsBlock> nextProducerBlock;
+    private final TimelineLong brokerEpoch;
 
     ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
         this.clusterControlManager = clusterControlManager;
-        this.nextProducerId = new TimelineLong(snapshotRegistry);
+        this.nextProducerBlock = new TimelineObject<>(snapshotRegistry, ProducerIdsBlock.EMPTY);
+        this.brokerEpoch = new TimelineLong(snapshotRegistry);
     }
 
     ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {
         clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch);
 
-        long firstProducerIdInBlock = nextProducerId.get();
+        long firstProducerIdInBlock = nextProducerBlock.get().firstProducerId();
         if (firstProducerIdInBlock > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId " +
                 "has exceeded the int64 type limit");
@@ -60,25 +63,26 @@ public class ProducerIdControlManager {
     }
 
     void replay(ProducerIdsRecord record) {
-        long currentNextProducerId = nextProducerId.get();
+        long currentNextProducerId = nextProducerBlock.get().firstProducerId();
         if (record.nextProducerId() <= currentNextProducerId) {
             throw new RuntimeException("Next Producer ID from replayed record (" + record.nextProducerId() + ")" +
                 " is not greater than current next Producer ID (" + currentNextProducerId + ")");
         } else {
-            nextProducerId.set(record.nextProducerId());
+            nextProducerBlock.set(new ProducerIdsBlock(record.brokerId(), record.nextProducerId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE));
+            brokerEpoch.set(record.brokerEpoch());
         }
     }
 
     Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
         List<ApiMessageAndVersion> records = new ArrayList<>(1);
 
-        long producerId = nextProducerId.get(epoch);
-        if (producerId > 0) {
+        ProducerIdsBlock producerIdBlock = nextProducerBlock.get(epoch);
+        if (producerIdBlock.firstProducerId() > 0) {
             records.add(new ApiMessageAndVersion(
                 new ProducerIdsRecord()
-                    .setNextProducerId(producerId)
-                    .setBrokerId(0)
-                    .setBrokerEpoch(0L),
+                    .setNextProducerId(producerIdBlock.firstProducerId())
+                    .setBrokerId(producerIdBlock.assignedBrokerId())
+                    .setBrokerEpoch(brokerEpoch.get(epoch)),
                 (short) 0));
         }
         return Collections.singleton(records).iterator();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7d3c55bd69..e42799fd1f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.controller;
 
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
@@ -44,6 +45,8 @@ import org.apache.kafka.common.message.ElectLeadersRequestData;
 import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.UpdateFeaturesRequestData;
+import org.apache.kafka.common.message.UpdateFeaturesResponseData;
 import org.apache.kafka.common.metadata.AccessControlEntryRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
@@ -74,8 +77,7 @@ import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
-import org.apache.kafka.metadata.FeatureMapAndEpoch;
-import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
 import org.apache.kafka.queue.KafkaEventQueue;
@@ -94,6 +96,7 @@ import org.slf4j.Logger;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Map;
@@ -143,7 +146,7 @@ public final class QuorumController implements Controller {
         private LogContext logContext = null;
         private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY;
         private RaftClient<ApiMessageAndVersion> raftClient = null;
-        private Map<String, VersionRange> supportedFeatures = Collections.emptyMap();
+        private QuorumFeatures quorumFeatures = null;
         private short defaultReplicationFactor = 3;
         private int defaultNumPartitions = 1;
         private boolean isLeaderRecoverySupported = false;
@@ -188,8 +191,8 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setSupportedFeatures(Map<String, VersionRange> supportedFeatures) {
-            this.supportedFeatures = supportedFeatures;
+        public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
+            this.quorumFeatures = quorumFeatures;
             return this;
         }
 
@@ -263,6 +266,9 @@ public final class QuorumController implements Controller {
             if (raftClient == null) {
                 throw new RuntimeException("You must set a raft client.");
             }
+            if (quorumFeatures == null) {
+                throw new RuntimeException("You must specify the quorum features");
+            }
             if (threadNamePrefix == null) {
                 threadNamePrefix = String.format("Node%d_", nodeId);
             }
@@ -273,11 +279,12 @@ public final class QuorumController implements Controller {
                 controllerMetrics = (ControllerMetrics) Class.forName(
                     "org.apache.kafka.controller.MockControllerMetrics").getConstructor().newInstance();
             }
+
             KafkaEventQueue queue = null;
             try {
                 queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
                 return new QuorumController(logContext, nodeId, clusterId, queue, time,
-                    configSchema, raftClient, supportedFeatures, defaultReplicationFactor,
+                    configSchema, raftClient, quorumFeatures, defaultReplicationFactor,
                     defaultNumPartitions, isLeaderRecoverySupported, replicaPlacer, snapshotMaxNewRecordBytes,
                     leaderImbalanceCheckIntervalNs, sessionTimeoutNs, controllerMetrics,
                     createTopicPolicy, alterConfigPolicy, configurationValidator, authorizer,
@@ -1312,7 +1319,7 @@ public final class QuorumController implements Controller {
                              Time time,
                              KafkaConfigSchema configSchema,
                              RaftClient<ApiMessageAndVersion> raftClient,
-                             Map<String, VersionRange> supportedFeatures,
+                             QuorumFeatures quorumFeatures,
                              short defaultReplicationFactor,
                              int defaultNumPartitions,
                              boolean isLeaderRecoverySupported,
@@ -1349,7 +1356,7 @@ public final class QuorumController implements Controller {
         this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
         this.clusterControl = new ClusterControlManager(logContext, clusterId, time,
             snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
-        this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
+        this.featureControl = new FeatureControlManager(logContext, quorumFeatures, snapshotRegistry);
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
         this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
         this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
@@ -1446,7 +1453,7 @@ public final class QuorumController implements Controller {
     }
 
     @Override
-    public CompletableFuture<FeatureMapAndEpoch> finalizedFeatures() {
+    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures() {
         return appendReadEvent("getFinalizedFeatures",
             () -> featureControl.finalizedFeatures(lastCommittedOffset));
     }
@@ -1575,6 +1582,31 @@ public final class QuorumController implements Controller {
                     .setProducerIdLen(result.size()));
     }
 
+    @Override
+    public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
+            UpdateFeaturesRequestData request) {
+        return appendWriteEvent("updateFeatures", () -> {
+            Map<String, Short> updates = new HashMap<>();
+            Map<String, FeatureUpdate.UpgradeType> upgradeTypes = new HashMap<>();
+            request.featureUpdates().forEach(featureUpdate -> {
+                String featureName = featureUpdate.feature();
+                upgradeTypes.put(featureName, FeatureUpdate.UpgradeType.fromCode(featureUpdate.upgradeType()));
+                updates.put(featureName, featureUpdate.maxVersionLevel());
+            });
+            return featureControl.updateFeatures(updates, upgradeTypes, clusterControl.brokerSupportedVersions(),
+                request.validateOnly());
+        }).thenApply(result -> {
+            UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData();
+            responseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
+            result.forEach((featureName, error) -> responseData.results().add(
+                new UpdateFeaturesResponseData.UpdatableFeatureResult()
+                    .setFeature(featureName)
+                    .setErrorCode(error.error().code())
+                    .setErrorMessage(error.message())));
+            return responseData;
+        });
+    }
+
     @Override
     public CompletableFuture<List<CreatePartitionsTopicResult>>
             createPartitions(long deadlineNs, List<CreatePartitionsTopic> topics) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
new file mode 100644
index 0000000000..0ee27bd4f3
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.metadata.VersionRange;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A holder class of the local node's supported feature flags.
+ */
+public class QuorumFeatures {
+    private final int nodeId;
+    private final Map<String, VersionRange> supportedFeatures;
+
+    QuorumFeatures(int nodeId,
+                          Map<String, VersionRange> supportedFeatures) {
+        this.nodeId = nodeId;
+        this.supportedFeatures = Collections.unmodifiableMap(supportedFeatures);
+    }
+
+    public static QuorumFeatures create(int nodeId,
+                                        Map<String, VersionRange> supportedFeatures) {
+        return new QuorumFeatures(nodeId, supportedFeatures);
+    }
+
+    public static Map<String, VersionRange> defaultFeatureMap() {
+        return Collections.emptyMap();
+    }
+
+    Optional<VersionRange> localSupportedFeature(String featureName) {
+        return Optional.ofNullable(supportedFeatures.get(featureName));
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 781c496f19..ca472322d6 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -19,7 +19,6 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
-import org.apache.kafka.metadata.VersionRange;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -33,13 +32,13 @@ import java.util.Optional;
 public final class FeaturesDelta {
     private final FeaturesImage image;
 
-    private final Map<String, Optional<VersionRange>> changes = new HashMap<>();
+    private final Map<String, Optional<Short>> changes = new HashMap<>();
 
     public FeaturesDelta(FeaturesImage image) {
         this.image = image;
     }
 
-    public Map<String, Optional<VersionRange>> changes() {
+    public Map<String, Optional<Short>> changes() {
         return changes;
     }
 
@@ -52,8 +51,7 @@ public final class FeaturesDelta {
     }
 
     public void replay(FeatureLevelRecord record) {
-        changes.put(record.name(), Optional.of(
-            new VersionRange(record.minFeatureLevel(), record.maxFeatureLevel())));
+        changes.put(record.name(), Optional.of(record.featureLevel()));
     }
 
     public void replay(RemoveFeatureLevelRecord record) {
@@ -61,26 +59,27 @@ public final class FeaturesDelta {
     }
 
     public FeaturesImage apply() {
-        Map<String, VersionRange> newFinalizedVersions =
+        Map<String, Short> newFinalizedVersions =
             new HashMap<>(image.finalizedVersions().size());
-        for (Entry<String, VersionRange> entry : image.finalizedVersions().entrySet()) {
+        for (Entry<String, Short> entry : image.finalizedVersions().entrySet()) {
             String name = entry.getKey();
-            Optional<VersionRange> change = changes.get(name);
+            Optional<Short> change = changes.get(name);
             if (change == null) {
                 newFinalizedVersions.put(name, entry.getValue());
             } else if (change.isPresent()) {
                 newFinalizedVersions.put(name, change.get());
             }
         }
-        for (Entry<String, Optional<VersionRange>> entry : changes.entrySet()) {
+        for (Entry<String, Optional<Short>> entry : changes.entrySet()) {
             String name = entry.getKey();
-            Optional<VersionRange> change = entry.getValue();
+            Optional<Short> change = entry.getValue();
             if (!newFinalizedVersions.containsKey(name)) {
                 if (change.isPresent()) {
                     newFinalizedVersions.put(name, change.get());
                 }
             }
         }
+
         return new FeaturesImage(newFinalizedVersions);
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
index f5f372936a..7e0f7fb435 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
 import java.util.ArrayList;
@@ -28,7 +27,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
 
@@ -41,9 +39,9 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
 public final class FeaturesImage {
     public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap());
 
-    private final Map<String, VersionRange> finalizedVersions;
+    private final Map<String, Short> finalizedVersions;
 
-    public FeaturesImage(Map<String, VersionRange> finalizedVersions) {
+    public FeaturesImage(Map<String, Short> finalizedVersions) {
         this.finalizedVersions = Collections.unmodifiableMap(finalizedVersions);
     }
 
@@ -51,22 +49,20 @@ public final class FeaturesImage {
         return finalizedVersions.isEmpty();
     }
 
-    Map<String, VersionRange> finalizedVersions() {
+    Map<String, Short> finalizedVersions() {
         return finalizedVersions;
     }
 
-    private Optional<VersionRange> finalizedVersion(String feature) {
+    private Optional<Short> finalizedVersion(String feature) {
         return Optional.ofNullable(finalizedVersions.get(feature));
     }
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
         List<ApiMessageAndVersion> batch = new ArrayList<>();
-        for (Entry<String, VersionRange> entry : finalizedVersions.entrySet()) {
+        for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
             batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(entry.getKey()).
-                setMinFeatureLevel(entry.getValue().min()).
-                setMaxFeatureLevel(entry.getValue().max()),
-                FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+                setFeatureLevel(entry.getValue()), FEATURE_LEVEL_RECORD.highestSupportedVersion()));
         }
         out.accept(batch);
     }
@@ -83,9 +79,11 @@ public final class FeaturesImage {
         return finalizedVersions.equals(other.finalizedVersions);
     }
 
+
     @Override
     public String toString() {
-        return finalizedVersions.entrySet().stream().
-            map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", "));
+        return "FeaturesImage{" +
+                "finalizedVersions=" + finalizedVersions +
+                '}';
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index fd5eb65d1a..cc8ed9b4aa 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -104,7 +104,7 @@ public class BrokerRegistration {
         }
         Map<String, VersionRange> supportedFeatures = new HashMap<>();
         for (BrokerFeature feature : record.features()) {
-            supportedFeatures.put(feature.name(), new VersionRange(
+            supportedFeatures.put(feature.name(), VersionRange.of(
                 feature.minSupportedVersion(), feature.maxSupportedVersion()));
         }
         return new BrokerRegistration(record.brokerId(),
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java
deleted file mode 100644
index 272c87d213..0000000000
--- a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-
-/**
- * A map of feature names to their supported versions.
- */
-public class FeatureMap {
-    private final Map<String, VersionRange> features;
-
-    public FeatureMap(Map<String, VersionRange> features) {
-        this.features = Collections.unmodifiableMap(new HashMap<>(features));
-    }
-
-    public Optional<VersionRange> get(String name) {
-        return Optional.ofNullable(features.get(name));
-    }
-
-    public Map<String, VersionRange> features() {
-        return features;
-    }
-
-    @Override
-    public int hashCode() {
-        return features.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof FeatureMap)) return false;
-        FeatureMap other = (FeatureMap) o;
-        return features.equals(other.features);
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder bld = new StringBuilder();
-        bld.append("{");
-        bld.append(features.keySet().stream().sorted().
-            map(k -> k + ": " + features.get(k)).
-            collect(Collectors.joining(", ")));
-        bld.append("}");
-        return bld.toString();
-    }
-}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java b/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
similarity index 61%
rename from metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java
rename to metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
index 26096ea7a3..2ebce9e3e6 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
@@ -17,23 +17,31 @@
 
 package org.apache.kafka.metadata;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 
 
 /**
  * A map of feature names to their supported versions.
  */
-public class FeatureMapAndEpoch {
-    private final FeatureMap map;
+public class FinalizedControllerFeatures {
+    private final Map<String, Short> featureMap;
     private final long epoch;
 
-    public FeatureMapAndEpoch(FeatureMap map, long epoch) {
-        this.map = map;
+    public FinalizedControllerFeatures(Map<String, Short> featureMap, long epoch) {
+        this.featureMap = Collections.unmodifiableMap(featureMap);
         this.epoch = epoch;
     }
 
-    public FeatureMap map() {
-        return map;
+    public Optional<Short> get(String name) {
+        return Optional.ofNullable(featureMap.get(name));
+    }
+
+    public Set<String> featureNames() {
+        return featureMap.keySet();
     }
 
     public long epoch() {
@@ -42,21 +50,21 @@ public class FeatureMapAndEpoch {
 
     @Override
     public int hashCode() {
-        return Objects.hash(map, epoch);
+        return Objects.hash(featureMap, epoch);
     }
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof FeatureMapAndEpoch)) return false;
-        FeatureMapAndEpoch other = (FeatureMapAndEpoch) o;
-        return map.equals(other.map) && epoch == other.epoch;
+        if (!(o instanceof FinalizedControllerFeatures)) return false;
+        FinalizedControllerFeatures other = (FinalizedControllerFeatures) o;
+        return featureMap.equals(other.featureMap) && epoch == other.epoch;
     }
 
     @Override
     public String toString() {
         StringBuilder bld = new StringBuilder();
         bld.append("{");
-        bld.append("map=").append(map.toString());
+        bld.append("featureMap=").append(featureMap.toString());
         bld.append(", epoch=").append(epoch);
         bld.append("}");
         return bld.toString();
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java b/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
index f171ea14bc..178d338344 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java
@@ -23,16 +23,20 @@ import java.util.Objects;
  * An immutable class which represents version ranges.
  */
 public class VersionRange {
-    public final static VersionRange ALL = new VersionRange((short) 0, Short.MAX_VALUE);
+    public final static VersionRange ALL = of((short) 0, Short.MAX_VALUE);
 
     private final short min;
     private final short max;
 
-    public VersionRange(short min, short max) {
+    private VersionRange(short min, short max) {
         this.min = min;
         this.max = max;
     }
 
+    public static VersionRange of(short min, short max) {
+        return new VersionRange(min, max);
+    }
+
     public short min() {
         return min;
     }
@@ -41,8 +45,18 @@ public class VersionRange {
         return max;
     }
 
-    public boolean contains(VersionRange other) {
-        return other.min >= min && other.max <= max;
+    /**
+     * Check if a given version is fully contained within this range
+     */
+    public boolean contains(short version) {
+        return version >= min && version <= max;
+    }
+
+    /**
+     * Check if a given version range has overlap with this one
+     */
+    public boolean intersects(VersionRange other) {
+        return other.min <= max && other.max >= min;
     }
 
     @Override
diff --git a/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json b/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
index ac112f15c2..03ff347eb8 100644
--- a/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
+++ b/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
@@ -22,9 +22,7 @@
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The feature name." },
-    { "name": "MinFeatureLevel", "type": "int16", "versions": "0+",
-      "about": "The current finalized minimum feature level of this feature for the cluster." },
-    { "name": "MaxFeatureLevel", "type": "int16", "versions": "0+",
-      "about": "The current finalized maximum feature level of this feature for the cluster." }
+    { "name": "FeatureLevel", "type": "int16", "versions": "0+",
+      "about": "The current finalized feature level of this feature for the cluster." }
   ]
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 124cb3d5f3..ea223d7de8 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -37,8 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.metadata.BrokerRegistration;
-import org.apache.kafka.metadata.FeatureMap;
-import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -103,7 +102,7 @@ public class ClusterControlManagerTest {
                     setRack(null).
                     setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
                 123L,
-                new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), 456L)));
+                new FinalizedControllerFeatures(Collections.emptyMap(), 456L)));
     }
 
     @Test
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 680253c712..f53b493a97 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -23,12 +23,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.metadata.FeatureMap;
-import org.apache.kafka.metadata.FeatureMapAndEpoch;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -48,59 +49,87 @@ public class FeatureControlManagerTest {
             String feature = (String) args[i];
             Integer low = (Integer) args[i + 1];
             Integer high = (Integer) args[i + 2];
-            result.put(feature, new VersionRange(low.shortValue(), high.shortValue()));
+            result.put(feature, VersionRange.of(low.shortValue(), high.shortValue()));
+        }
+        return result;
+    }
+
+    private static Map<String, Short> versionMap(Object... args) {
+        Map<String, Short> result = new HashMap<>();
+        for (int i = 0; i < args.length; i += 2) {
+            String feature = (String) args[i];
+            Integer ver = (Integer) args[i + 1];
+            result.put(feature, ver.shortValue());
+        }
+        return result;
+    }
+
+    public static QuorumFeatures features(Object... args) {
+        return QuorumFeatures.create(0, rangeMap(args));
+    }
+
+    private static Map<String, Short> updateMap(Object... args) {
+        Map<String, Short> result = new HashMap<>();
+        for (int i = 0; i < args.length; i += 2) {
+            String feature = (String) args[i];
+            Integer ver = (Integer) args[i + 1];
+            result.put(feature, ver.shortValue());
         }
         return result;
     }
 
     @Test
     public void testUpdateFeatures() {
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
         snapshotRegistry.getOrCreateSnapshot(-1);
-        FeatureControlManager manager = new FeatureControlManager(
-            rangeMap("foo", 1, 2), snapshotRegistry);
-        assertEquals(new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), -1),
+        FeatureControlManager manager = new FeatureControlManager(logContext,
+            features("foo", 1, 2), snapshotRegistry);
+        assertEquals(new FinalizedControllerFeatures(Collections.emptyMap(), -1),
             manager.finalizedFeatures(-1));
         assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "The controller does not support the given feature range."))),
-            manager.updateFeatures(rangeMap("foo", 1, 3),
-                Collections.singleton("foo"),
-                Collections.emptyMap()));
+            manager.updateFeatures(updateMap("foo", 3),
+                Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+                Collections.emptyMap(), false));
         ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
-            rangeMap("foo", 1, 2, "bar", 1, 1), Collections.emptySet(),
-                Collections.emptyMap());
+                updateMap("foo", 2, "bar", 1), Collections.emptyMap(),
+                Collections.emptyMap(), false);
         Map<String, ApiError> expectedMap = new HashMap<>();
         expectedMap.put("foo", ApiError.NONE);
         expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The controller does not support the given feature range."));
+                "The controller does not support the given feature."));
         assertEquals(expectedMap, result.response());
         List<ApiMessageAndVersion> expectedMessages = new ArrayList<>();
         expectedMessages.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-            setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 2),
+            setName("foo").setFeatureLevel((short) 2),
             (short) 0));
         assertEquals(expectedMessages, result.records());
     }
 
     @Test
     public void testReplay() {
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
         FeatureLevelRecord record = new FeatureLevelRecord().
-            setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 2);
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+            setName("foo").setFeatureLevel((short) 2);
+
         snapshotRegistry.getOrCreateSnapshot(-1);
-        FeatureControlManager manager = new FeatureControlManager(
-            rangeMap("foo", 1, 2), snapshotRegistry);
+        FeatureControlManager manager = new FeatureControlManager(logContext,
+            features("foo", 1, 2), snapshotRegistry);
         manager.replay(record);
         snapshotRegistry.getOrCreateSnapshot(123);
-        assertEquals(new FeatureMapAndEpoch(new FeatureMap(rangeMap("foo", 1, 2)), 123),
+        assertEquals(new FinalizedControllerFeatures(versionMap("foo", 2), 123),
             manager.finalizedFeatures(123));
     }
 
     @Test
     public void testUpdateFeaturesErrorCases() {
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        FeatureControlManager manager = new FeatureControlManager(
-            rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        FeatureControlManager manager = new FeatureControlManager(logContext,
+            features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
 
         assertEquals(
             ControllerResult.atomicOf(
@@ -114,24 +143,24 @@ public class FeatureControlManagerTest {
                 )
             ),
             manager.updateFeatures(
-                rangeMap("foo", 1, 3),
-                Collections.singleton("foo"),
-                Collections.singletonMap(5, rangeMap())
-            )
+                updateMap("foo", 3),
+                Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+                Collections.singletonMap(5, rangeMap()),
+                false)
         );
 
         ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
-            rangeMap("foo", 1, 3), Collections.emptySet(), Collections.emptyMap());
+            updateMap("foo", 3), Collections.emptyMap(), Collections.emptyMap(), false);
         assertEquals(Collections.singletonMap("foo", ApiError.NONE), result.response());
         manager.replay((FeatureLevelRecord) result.records().get(0).message());
         snapshotRegistry.getOrCreateSnapshot(3);
 
         assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature without " +
-                    "setting downgradable to true."))),
-            manager.updateFeatures(rangeMap("foo", 1, 2),
-                Collections.emptySet(), Collections.emptyMap()));
+                    "Can't downgrade the maximum version of this feature without setting the upgrade type to " +
+                    "safe or unsafe downgrade."))),
+            manager.updateFeatures(updateMap("foo", 2),
+                Collections.emptyMap(), Collections.emptyMap(), false));
 
         assertEquals(
             ControllerResult.atomicOf(
@@ -139,39 +168,37 @@ public class FeatureControlManagerTest {
                     new ApiMessageAndVersion(
                         new FeatureLevelRecord()
                             .setName("foo")
-                            .setMinFeatureLevel((short) 1)
-                            .setMaxFeatureLevel((short) 2),
+                            .setFeatureLevel((short) 2),
                         (short) 0
                     )
                 ),
                 Collections.singletonMap("foo", ApiError.NONE)
             ),
             manager.updateFeatures(
-                rangeMap("foo", 1, 2),
-                Collections.singleton("foo"),
-                Collections.emptyMap()
-            )
+                updateMap("foo", 2),
+                Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
+                Collections.emptyMap(),
+                false)
         );
     }
 
     @Test
     public void testFeatureControlIterator() throws Exception {
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        FeatureControlManager manager = new FeatureControlManager(
-            rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        FeatureControlManager manager = new FeatureControlManager(logContext,
+            features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
         ControllerResult<Map<String, ApiError>> result = manager.
-            updateFeatures(rangeMap("foo", 1, 5, "bar", 1, 1),
-                Collections.emptySet(), Collections.emptyMap());
+            updateFeatures(updateMap("foo", 5, "bar", 1),
+                Collections.emptyMap(), Collections.emptyMap(), false);
         RecordTestUtils.replayAll(manager, result.records());
         RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
             Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName("foo").
-                setMinFeatureLevel((short) 1).
-                setMaxFeatureLevel((short) 5), (short) 0)),
+                setFeatureLevel((short) 5), (short) 0)),
             Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName("bar").
-                setMinFeatureLevel((short) 1).
-                setMaxFeatureLevel((short) 1), (short) 0))),
+                setFeatureLevel((short) 1), (short) 0))),
             manager.iterator(Long.MAX_VALUE));
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 2ec06de808..40b7274f02 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -178,7 +178,7 @@ public class QuorumControllerTest {
     private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
                                                     QuorumController controller)
                                                     throws Throwable {
-        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(0L));
+        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2L));
         CompletableFuture<Map<ConfigResource, ApiError>> future1 =
             controller.incrementalAlterConfigs(Collections.singletonMap(
                 BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), false);
@@ -187,7 +187,7 @@ public class QuorumControllerTest {
             new ResultOrError<>(Collections.emptyMap())),
             controller.describeConfigs(Collections.singletonMap(
                 BROKER0, Collections.emptyList())).get());
-        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2L));
+        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(3L));
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get());
     }
 
@@ -426,6 +426,7 @@ public class QuorumControllerTest {
                         setBrokerId(0).
                         setClusterId(active.clusterId()).
                         setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+                        setFeatures(brokerFeatures()).
                         setListeners(listeners));
                 assertEquals(0L, reply.get().epoch());
                 CreateTopicsRequestData createTopicsRequestData =
@@ -466,6 +467,11 @@ public class QuorumControllerTest {
         }
     }
 
+    private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
+        BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
+        return features;
+    }
+
     @Test
     public void testSnapshotSaveAndLoad() throws Throwable {
         final int numBrokers = 4;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 43346fe1fb..b7bff3883b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -61,6 +61,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
             for (int i = 0; i < numControllers; i++) {
                 QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
                 builder.setRaftClient(logEnv.logManagers().get(i));
+                builder.setQuorumFeatures(new QuorumFeatures(i, QuorumFeatures.defaultFeatureMap()));
                 sessionTimeoutMillis.ifPresent(timeout -> {
                     builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
                 });
diff --git a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
index 6908cf2a78..e10d4a5971 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
@@ -60,14 +60,14 @@ public class ClusterImageTest {
             1000,
             Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"),
             Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
-            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 3)),
+            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
             Optional.empty(),
             true));
         map1.put(1, new BrokerRegistration(1,
             1001,
             Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"),
             Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
-            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 3)),
+            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
             Optional.empty(),
             false));
         map1.put(2, new BrokerRegistration(2,
@@ -96,14 +96,14 @@ public class ClusterImageTest {
             1000,
             Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"),
             Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
-            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 3)),
+            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
             Optional.empty(),
             false));
         map2.put(1, new BrokerRegistration(1,
             1001,
             Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"),
             Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
-            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 3)),
+            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
             Optional.empty(),
             true));
         IMAGE2 = new ClusterImage(map2);
diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
index 720086f87b..52388fbf33 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
@@ -20,7 +20,6 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
 import org.apache.kafka.metadata.RecordTestUtils;
-import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -43,15 +42,15 @@ public class FeaturesImageTest {
     final static FeaturesImage IMAGE2;
 
     static {
-        Map<String, VersionRange> map1 = new HashMap<>();
-        map1.put("foo", new VersionRange((short) 1, (short) 2));
-        map1.put("bar", new VersionRange((short) 1, (short) 1));
-        map1.put("baz", new VersionRange((short) 1, (short) 8));
+        Map<String, Short> map1 = new HashMap<>();
+        map1.put("foo", (short) 2);
+        map1.put("bar", (short) 1);
+        map1.put("baz", (short) 8);
         IMAGE1 = new FeaturesImage(map1);
 
         DELTA1_RECORDS = new ArrayList<>();
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-            setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 3),
+            setName("foo").setFeatureLevel((short) 3),
             FEATURE_LEVEL_RECORD.highestSupportedVersion()));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveFeatureLevelRecord().
             setName("bar"), REMOVE_FEATURE_LEVEL_RECORD.highestSupportedVersion()));
@@ -61,8 +60,8 @@ public class FeaturesImageTest {
         DELTA1 = new FeaturesDelta(IMAGE1);
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
 
-        Map<String, VersionRange> map2 = new HashMap<>();
-        map2.put("foo", new VersionRange((short) 1, (short) 3));
+        Map<String, Short> map2 = new HashMap<>();
+        map2.put("foo", (short) 3);
         IMAGE2 = new FeaturesImage(map2);
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 0f350c4622..d9622b8e4c 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -40,15 +40,15 @@ public class BrokerRegistrationTest {
     private static final List<BrokerRegistration> REGISTRATIONS = Arrays.asList(
         new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"),
             Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)),
-            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2)),
             Optional.empty(), false),
         new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"),
             Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)),
-            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+            Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2)),
             Optional.empty(), false),
         new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"),
             Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
-            Collections.singletonMap("foo", new VersionRange((short) 2, (short) 3)),
+            Collections.singletonMap("foo", VersionRange.of((short) 2, (short) 3)),
             Optional.of("myrack"), false));
 
     @Test
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java b/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
index 88082a6f55..d31e8f8139 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
@@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
@@ -31,24 +32,24 @@ public class VersionRangeTest {
         assertTrue(a >= Short.MIN_VALUE);
         assertTrue(b <= Short.MAX_VALUE);
         assertTrue(b >= Short.MIN_VALUE);
-        return new VersionRange((short) a, (short) b);
+        return VersionRange.of((short) a, (short) b);
     }
 
     @Test
     public void testEquality() {
         assertEquals(v(1, 1), v(1, 1));
-        assertFalse(v(1, 1).equals(v(1, 2)));
-        assertFalse(v(2, 1).equals(v(1, 2)));
-        assertFalse(v(2, 1).equals(v(2, 2)));
+        assertNotEquals(v(1, 2), v(1, 1));
+        assertNotEquals(v(1, 2), v(2, 1));
+        assertNotEquals(v(2, 2), v(2, 1));
     }
 
     @Test
     public void testContains() {
-        assertTrue(v(1, 1).contains(v(1, 1)));
-        assertFalse(v(1, 1).contains(v(1, 2)));
-        assertTrue(v(1, 2).contains(v(1, 1)));
-        assertFalse(v(4, 10).contains(v(3, 8)));
-        assertTrue(v(2, 12).contains(v(3, 11)));
+        assertTrue(v(1, 1).contains((short) 1));
+        assertFalse(v(1, 1).contains((short) 2));
+        assertTrue(v(1, 2).contains((short) 1));
+        assertFalse(v(4, 10).contains((short) 3));
+        assertTrue(v(2, 12).contains((short) 11));
     }
 
     @Test
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index b36d4f1563..866f541fb2 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -240,6 +240,7 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
             throw new IllegalArgumentException();
         }
 
+        // Read the metadata record body from the file input reader
         T record = serde.read(input, valueSize);
 
         int numHeaders = input.readVarint();