You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/30 23:57:36 UTC

[kafka] branch 3.3 updated: KAFKA-14187: kafka-features.sh: add support for --metadata (#12571)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 26a884cc30 KAFKA-14187: kafka-features.sh: add support for --metadata (#12571)
26a884cc30 is described below

commit 26a884cc307c45c978ffda2e2a2a0073089aadbc
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Tue Aug 30 16:56:03 2022 -0700

    KAFKA-14187: kafka-features.sh: add support for --metadata (#12571)
    
    This PR adds support to kafka-features.sh for the --metadata flag, as specified in KIP-778.  This
    flag makes it possible to upgrade to a new metadata version without consulting a table mapping
    version names to short integers. Change --feature to use a key=value format.
    
    FeatureCommandTest.scala: make most tests here true unit tests (that don't start brokers) in order
    to improve test run time, and allow us to test more cases. For the integration test part, test both
    KRaft and ZK-based clusters. Add support for mocking feature operations in MockAdminClient.java.
    
    upgrade.html: add a section describing how the metadata.version should be upgraded in KRaft
    clusters.
    
    Add kraft_upgrade_test.py to test upgrades between KRaft versions.
    
    Reviewers: David Arthur <mu...@gmail.com>, dengziming <de...@gmail.com>, José Armando García Sancio <js...@gmail.com>
---
 .../kafka/clients/admin/FinalizedVersionRange.java |   4 +-
 .../kafka/clients/admin/SupportedVersionRange.java |   4 +-
 .../kafka/clients/admin/MockAdminClient.java       | 138 +++++++-
 .../main/scala/kafka/admin/FeatureCommand.scala    | 356 ++++++++++----------
 .../unit/kafka/admin/FeatureCommandTest.scala      | 370 ++++++++++++++++-----
 docs/upgrade.html                                  |  20 ++
 tests/kafkatest/services/kafka/kafka.py            |  13 +
 tests/kafkatest/tests/core/kraft_upgrade_test.py   | 122 +++++++
 tests/kafkatest/version.py                         |   2 +
 9 files changed, 742 insertions(+), 287 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java b/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
index aa0401a8a8..1442de5851 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
@@ -36,10 +36,10 @@ public class FinalizedVersionRange {
      * @throws IllegalArgumentException   Raised when the condition described above is not met.
      */
     FinalizedVersionRange(final short minVersionLevel, final short maxVersionLevel) {
-        if (minVersionLevel < 1 || maxVersionLevel < 1 || maxVersionLevel < minVersionLevel) {
+        if (minVersionLevel < 0 || maxVersionLevel < 0 || maxVersionLevel < minVersionLevel) {
             throw new IllegalArgumentException(
                 String.format(
-                    "Expected minVersionLevel >= 1, maxVersionLevel >= 1 and" +
+                    "Expected minVersionLevel >= 0, maxVersionLevel >= 0 and" +
                     " maxVersionLevel >= minVersionLevel, but received" +
                     " minVersionLevel: %d, maxVersionLevel: %d", minVersionLevel, maxVersionLevel));
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java b/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
index d71da31fb8..b85a392a65 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
@@ -36,10 +36,10 @@ public class SupportedVersionRange {
      * @throws IllegalArgumentException   Raised when the condition described above is not met.
      */
     SupportedVersionRange(final short minVersion, final short maxVersion) {
-        if (minVersion < 1 || maxVersion < 1 || maxVersion < minVersion) {
+        if (minVersion < 0 || maxVersion < 0 || maxVersion < minVersion) {
             throw new IllegalArgumentException(
                 String.format(
-                    "Expected 1 <= minVersion <= maxVersion but received minVersion:%d, maxVersion:%d.",
+                    "Expected 0 <= minVersion <= maxVersion but received minVersion:%d, maxVersion:%d.",
                     minVersion,
                     maxVersion));
         }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 8c31c7cf69..32b659fe4a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.InvalidUpdateVersionException;
 import org.apache.kafka.common.errors.KafkaStorageException;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -77,6 +78,9 @@ public class MockAdminClient extends AdminClient {
     private final Map<TopicPartition, Long> endOffsets;
     private final Map<TopicPartition, Long> committedOffsets;
     private final boolean usingRaftController;
+    private final Map<String, Short> featureLevels;
+    private final Map<String, Short> minSupportedFeatureLevels;
+    private final Map<String, Short> maxSupportedFeatureLevels;
     private final String clusterId;
     private final List<List<String>> brokerLogDirs;
     private final List<Map<String, String>> brokerConfigs;
@@ -102,6 +106,9 @@ public class MockAdminClient extends AdminClient {
         private Short defaultPartitions;
         private boolean usingRaftController = false;
         private Integer defaultReplicationFactor;
+        private Map<String, Short> featureLevels = Collections.emptyMap();
+        private Map<String, Short> minSupportedFeatureLevels = Collections.emptyMap();
+        private Map<String, Short> maxSupportedFeatureLevels = Collections.emptyMap();
 
         public Builder() {
             numBrokers(1);
@@ -156,6 +163,21 @@ public class MockAdminClient extends AdminClient {
             return this;
         }
 
+        public Builder featureLevels(Map<String, Short> featureLevels) {
+            this.featureLevels = featureLevels;
+            return this;
+        }
+
+        public Builder minSupportedFeatureLevels(Map<String, Short> minSupportedFeatureLevels) {
+            this.minSupportedFeatureLevels = minSupportedFeatureLevels;
+            return this;
+        }
+
+        public Builder maxSupportedFeatureLevels(Map<String, Short> maxSupportedFeatureLevels) {
+            this.maxSupportedFeatureLevels = maxSupportedFeatureLevels;
+            return this;
+        }
+
         public MockAdminClient build() {
             return new MockAdminClient(brokers,
                 controller == null ? brokers.get(0) : controller,
@@ -163,7 +185,10 @@ public class MockAdminClient extends AdminClient {
                 defaultPartitions != null ? defaultPartitions.shortValue() : 1,
                 defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3),
                 brokerLogDirs,
-                usingRaftController);
+                usingRaftController,
+                featureLevels,
+                minSupportedFeatureLevels,
+                maxSupportedFeatureLevels);
         }
     }
 
@@ -172,17 +197,30 @@ public class MockAdminClient extends AdminClient {
     }
 
     public MockAdminClient(List<Node> brokers, Node controller) {
-        this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(),
-            Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS), false);
-    }
-
-    private MockAdminClient(List<Node> brokers,
-                            Node controller,
-                            String clusterId,
-                            int defaultPartitions,
-                            int defaultReplicationFactor,
-                            List<List<String>> brokerLogDirs,
-                            boolean usingRaftController) {
+        this(brokers,
+            controller,
+            DEFAULT_CLUSTER_ID,
+            1,
+            brokers.size(),
+            Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS),
+            false,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap());
+    }
+
+    private MockAdminClient(
+        List<Node> brokers,
+        Node controller,
+        String clusterId,
+        int defaultPartitions,
+        int defaultReplicationFactor,
+        List<List<String>> brokerLogDirs,
+        boolean usingRaftController,
+        Map<String, Short> featureLevels,
+        Map<String, Short> minSupportedFeatureLevels,
+        Map<String, Short> maxSupportedFeatureLevels
+    ) {
         this.brokers = brokers;
         controller(controller);
         this.clusterId = clusterId;
@@ -199,6 +237,9 @@ public class MockAdminClient extends AdminClient {
         this.endOffsets = new HashMap<>();
         this.committedOffsets = new HashMap<>();
         this.usingRaftController = usingRaftController;
+        this.featureLevels = new HashMap<>(featureLevels);
+        this.minSupportedFeatureLevels = new HashMap<>(minSupportedFeatureLevels);
+        this.maxSupportedFeatureLevels = new HashMap<>(maxSupportedFeatureLevels);
     }
 
     synchronized public void controller(Node controller) {
@@ -995,12 +1036,79 @@ public class MockAdminClient extends AdminClient {
 
     @Override
     public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>();
+        Map<String, SupportedVersionRange> supportedFeatures = new HashMap<>();
+        for (Map.Entry<String, Short> entry : featureLevels.entrySet()) {
+            finalizedFeatures.put(entry.getKey(), new FinalizedVersionRange(
+                    entry.getValue(), entry.getValue()));
+            supportedFeatures.put(entry.getKey(), new SupportedVersionRange(
+                    minSupportedFeatureLevels.get(entry.getKey()),
+                    maxSupportedFeatureLevels.get(entry.getKey())));
+        }
+        return new DescribeFeaturesResult(KafkaFuture.completedFuture(
+                new FeatureMetadata(finalizedFeatures,
+                    Optional.of(123L),
+                    supportedFeatures)));
     }
 
     @Override
-    public UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+    public UpdateFeaturesResult updateFeatures(
+        Map<String, FeatureUpdate> featureUpdates,
+        UpdateFeaturesOptions options
+    ) {
+        Map<String, KafkaFuture<Void>> results = new HashMap<>();
+        for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
+            KafkaFutureImpl<Void> future = new KafkaFutureImpl<Void>();
+            String feature = entry.getKey();
+            try {
+                short cur = featureLevels.getOrDefault(feature, (short) 0);
+                short next = entry.getValue().maxVersionLevel();
+                short min = minSupportedFeatureLevels.getOrDefault(feature, (short) 0);
+                short max = maxSupportedFeatureLevels.getOrDefault(feature, (short) 0);
+                switch (entry.getValue().upgradeType()) {
+                    case UNKNOWN:
+                        throw new InvalidRequestException("Invalid upgrade type.");
+                    case UPGRADE:
+                        if (cur > next) {
+                            throw new InvalidUpdateVersionException("Can't upgrade to lower version.");
+                        }
+                        break;
+                    case SAFE_DOWNGRADE:
+                        if (cur < next) {
+                            throw new InvalidUpdateVersionException("Can't downgrade to newer version.");
+                        }
+                        break;
+                    case UNSAFE_DOWNGRADE:
+                        if (cur < next) {
+                            throw new InvalidUpdateVersionException("Can't downgrade to newer version.");
+                        }
+                        while (next != cur) {
+                            // Simulate a scenario where all the even feature levels unsafe to downgrade from.
+                            if (cur % 2 == 0) {
+                                if (entry.getValue().upgradeType() == FeatureUpdate.UpgradeType.SAFE_DOWNGRADE) {
+                                    throw new InvalidUpdateVersionException("Unable to perform a safe downgrade.");
+                                }
+                            }
+                            cur--;
+                        }
+                        break;
+                }
+                if (next < min) {
+                    throw new InvalidUpdateVersionException("Can't downgrade below " + min);
+                }
+                if (next > max) {
+                    throw new InvalidUpdateVersionException("Can't upgrade above " + max);
+                }
+                if (!options.validateOnly()) {
+                    featureLevels.put(feature, next);
+                }
+                future.complete(null);
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+            results.put(feature, future);
+        }
+        return new UpdateFeaturesResult(results);
     }
 
     @Override
diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala
index c5c62648f4..60ece8e4bd 100644
--- a/core/src/main/scala/kafka/admin/FeatureCommand.scala
+++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala
@@ -20,28 +20,33 @@ package kafka.admin
 import kafka.tools.TerseFailure
 import kafka.utils.Exit
 import net.sourceforge.argparse4j.ArgumentParsers
-import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, storeTrue}
-import net.sourceforge.argparse4j.inf.{Namespace, Subparsers}
+import net.sourceforge.argparse4j.impl.Arguments.{append, fileType, store, storeTrue}
+import net.sourceforge.argparse4j.inf.{ArgumentParserException, Namespace, Subparsers}
+import net.sourceforge.argparse4j.internal.HelpScreenException
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
-import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.common.MetadataVersion
 
-import java.io.File
+import java.io.{File, PrintStream}
 import java.util.Properties
-import scala.collection.Seq
 import scala.concurrent.ExecutionException
 import scala.jdk.CollectionConverters._
+import scala.compat.java8.OptionConverters._
 
 object FeatureCommand {
-
   def main(args: Array[String]): Unit = {
-    val res = mainNoExit(args)
+    val res = mainNoExit(args, System.out)
     Exit.exit(res)
   }
 
-  // This is used for integration tests in order to avoid killing the test with Exit.exit
-  def mainNoExit(args: Array[String]): Int = {
+  // This is used for integration tests in order to avoid killing the test with Exit.exit,
+  // and in order to capture the command output.
+  def mainNoExit(
+    args: Array[String],
+    out: PrintStream
+  ): Int = {
     val parser = ArgumentParsers.newArgumentParser("kafka-features")
       .defaultHelp(true)
       .description("This tool manages feature flags in Kafka.")
@@ -59,7 +64,7 @@ object FeatureCommand {
     addDisableParser(subparsers)
 
     try {
-      val namespace = parser.parseArgsOrFail(args)
+      val namespace = parser.parseArgs(args)
       val command = namespace.getString("command")
 
       val commandConfig = namespace.get[File]("command_config")
@@ -75,14 +80,19 @@ object FeatureCommand {
       val admin = Admin.create(props)
 
       command match {
-        case "describe" => handleDescribe(namespace, admin)
-        case "upgrade" => handleUpgrade(namespace, admin)
-        case "downgrade" => handleDowngrade(namespace, admin)
-        case "disable" => handleDisable(namespace, admin)
+        case "describe" => handleDescribe(out, admin)
+        case "upgrade" => handleUpgrade(out, namespace, admin)
+        case "downgrade" => handleDowngrade(out, namespace, admin)
+        case "disable" => handleDisable(out, namespace, admin)
       }
       admin.close()
       0
     } catch {
+      case _: HelpScreenException =>
+        0
+      case e: ArgumentParserException =>
+        System.err.println(e.getMessage)
+        1
       case e: TerseFailure =>
         System.err.println(e.getMessage)
         1
@@ -90,175 +100,186 @@ object FeatureCommand {
   }
 
   def addDescribeParser(subparsers: Subparsers): Unit = {
-    val describeParser = subparsers.addParser("describe")
-      .help("Describe one or more feature flags.")
-
-    val featureArgs = describeParser.addArgumentGroup("Specific Features")
-    featureArgs.addArgument("--feature")
-      .action(append())
-      .help("A specific feature to describe. This option may be repeated for describing multiple feature flags.")
-
-    val releaseArgs = describeParser.addArgumentGroup("All Features for release")
-    releaseArgs.addArgument("--release")
+    subparsers.addParser("describe")
+      .help("Describes the current active feature flags.")
   }
 
   def addUpgradeParser(subparsers: Subparsers): Unit = {
     val upgradeParser = subparsers.addParser("upgrade")
       .help("Upgrade one or more feature flags.")
-
-    val featureArgs = upgradeParser.addArgumentGroup("Upgrade specific features")
-    featureArgs.addArgument("--feature")
-      .action(append())
-      .help("A feature flag to upgrade. This option may be repeated for upgrading multiple feature flags.")
-    featureArgs.addArgument("--version")
-      .`type`(classOf[Short])
-      .help("The version to upgrade to.")
+    upgradeParser.addArgument("--metadata")
+      .help("The level to which we should upgrade the metadata. For example, 3.3-IV3.")
+      .action(store())
+    upgradeParser.addArgument("--feature")
+      .help("A feature upgrade we should perform, in key=value format. For example metadata.version=3.3-IV3.")
       .action(append())
-
-    val releaseArgs = upgradeParser.addArgumentGroup("Upgrade to feature level defined for a given release")
-    releaseArgs.addArgument("--release")
-
     upgradeParser.addArgument("--dry-run")
-      .help("Perform a dry-run of this upgrade operation.")
+      .help("Validate this upgrade, but do not perform it.")
       .action(storeTrue())
   }
 
   def addDowngradeParser(subparsers: Subparsers): Unit = {
     val downgradeParser = subparsers.addParser("downgrade")
-      .help("Upgrade one or more feature flags.")
-
+      .help("Downgrade one or more feature flags.")
+    downgradeParser.addArgument("--metadata")
+      .help("The level to which we should downgrade the metadata. For example, 3.3-IV0.")
+      .action(store())
     downgradeParser.addArgument("--feature")
-      .help("A feature flag to downgrade. This option may be repeated for downgrade multiple feature flags.")
-      .required(true)
-      .action(append())
-    downgradeParser.addArgument("--version")
-      .`type`(classOf[Short])
-      .help("The version to downgrade to.")
-      .required(true)
+      .help("A feature downgrade we should perform, in key=value format. " +
+        "For example metadata.version=3.3-IV0.")
       .action(append())
     downgradeParser.addArgument("--unsafe")
-      .help("Perform this downgrade even if it considered unsafe. Refer to specific feature flag documentation for details.")
+      .help("Perform this downgrade even if it may irreversibly destroy metadata.")
       .action(storeTrue())
     downgradeParser.addArgument("--dry-run")
-      .help("Perform a dry-run of this downgrade operation.")
+      .help("Validate this downgrade, but do not perform it.")
       .action(storeTrue())
   }
 
   def addDisableParser(subparsers: Subparsers): Unit = {
     val disableParser = subparsers.addParser("disable")
       .help("Disable one or more feature flags. This is the same as downgrading the version to zero.")
-
     disableParser.addArgument("--feature")
-      .help("A feature flag to disable. This option may be repeated for disable multiple feature flags.")
-      .required(true)
+      .help("A feature flag to disable.")
       .action(append())
     disableParser.addArgument("--unsafe")
-      .help("Disable the feature flag(s) even if it considered unsafe. Refer to specific feature flag documentation for details.")
+      .help("Disable this feature flag even if it may irreversibly destroy metadata.")
       .action(storeTrue())
     disableParser.addArgument("--dry-run")
       .help("Perform a dry-run of this disable operation.")
       .action(storeTrue())
   }
 
-  def handleDescribe(namespace: Namespace, admin: Admin): Unit = {
-    val featureFilter = parseFeaturesOrRelease(namespace) match {
-      case Neither() => (_: String) => true
-      case Features(featureNames) => (feature: String) => featureNames.contains(feature)
-      case Release(release) =>
-        // Special case, print the versions associated with the given release
-        printReleaseFeatures(release)
-        return
-      case Both() => throw new TerseFailure("Only one of --release or --feature may be specified with describe sub-command.")
+  def levelToString(
+    feature: String,
+    level: Short
+  ): String = {
+    if (feature.equals(MetadataVersion.FEATURE_NAME)) {
+      try {
+        MetadataVersion.fromFeatureLevel(level).version()
+      } catch {
+        case e: Throwable => s"UNKNOWN [${level}]"
+      }
+    } else {
+      level.toString
     }
+  }
 
+  def handleDescribe(
+    out: PrintStream,
+    admin: Admin
+  ): Unit = {
     val featureMetadata = admin.describeFeatures().featureMetadata().get()
-    val featureEpoch = featureMetadata.finalizedFeaturesEpoch()
-    val epochString = if (featureEpoch.isPresent) {
-      s"Epoch: ${featureEpoch.get}"
-    } else {
-      "Epoch: -"
-    }
-    val finalized = featureMetadata.finalizedFeatures().asScala
-    featureMetadata.supportedFeatures().asScala.foreach {
-      case (feature, range) =>
-        if (featureFilter.apply(feature)) {
-          if (finalized.contains(feature)) {
-            println(s"Feature: $feature\tSupportedMinVersion: ${range.minVersion()}\t" +
-              s"SupportedMaxVersion: ${range.maxVersion()}\tFinalizedVersionLevel: ${finalized(feature).maxVersionLevel()}\t$epochString")
-          } else {
-            println(s"Feature: $feature\tSupportedMinVersion: ${range.minVersion()}\t" +
-              s"SupportedMaxVersion: ${range.maxVersion()}\tFinalizedVersionLevel: -\t$epochString")
-          }
+    val featureList = new java.util.TreeSet[String](featureMetadata.supportedFeatures().keySet())
+      featureList.forEach {
+      case feature =>
+        val finalizedLevel = featureMetadata.finalizedFeatures().asScala.get(feature) match {
+          case None => 0.toShort
+          case Some(v) => v.maxVersionLevel()
         }
+        val range = featureMetadata.supportedFeatures().get(feature)
+        out.printf("Feature: %s\tSupportedMinVersion: %s\tSupportedMaxVersion: %s\tFinalizedVersionLevel: %s\tEpoch: %s%n",
+          feature,
+          levelToString(feature, range.minVersion()),
+          levelToString(feature, range.maxVersion()),
+          levelToString(feature, finalizedLevel),
+          featureMetadata.finalizedFeaturesEpoch().asScala.flatMap(e => Some(e.toString)).getOrElse("-"))
     }
   }
 
-  def printReleaseFeatures(release: String): Unit = {
-    println(s"Default feature versions for release $release:")
+  def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = {
+    MetadataVersion.VERSIONS.toList.asJava.
+      subList(first.ordinal(), last.ordinal() + 1).
+      asScala.mkString(", ")
   }
 
-  def handleUpgrade(namespace: Namespace, admin: Admin): Unit = {
-    val featuresToUpgrade = parseFeaturesOrRelease(namespace) match {
-      case Features(featureNames) => parseVersions(featureNames, namespace)
-      case Release(release) => featuresForRelease(release)
-      case Neither() => throw new TerseFailure("Must specify either --release or at least one --feature and --version with upgrade sub-command.")
-      case Both() => throw new TerseFailure("Cannot specify both --release and --feature with upgrade sub-command.")
-    }
-
-    val dryRun = namespace.getBoolean("dry_run")
-    val updateResult = admin.updateFeatures(featuresToUpgrade.map { case (feature, version) =>
-      feature -> new FeatureUpdate(version, UpgradeType.UPGRADE)
-    }.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
-    handleUpdateFeaturesResponse(updateResult, featuresToUpgrade, dryRun, "upgrade")
+  def handleUpgrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
+    handleUpgradeOrDowngrade("upgrade", out, namespace, admin, UpgradeType.UPGRADE)
   }
 
-  def handleDowngrade(namespace: Namespace, admin: Admin): Unit = {
-    val featuresToDowngrade = parseFeaturesOrRelease(namespace) match {
-      case Features(featureNames) => parseVersions(featureNames, namespace)
-      case Neither() => throw new TerseFailure("Must specify at least one --feature and --version with downgrade sub-command.")
-      case _ => throw new IllegalStateException()
-    }
-
-    val dryRun = namespace.getBoolean("dry_run")
+  def downgradeType(namespace: Namespace): UpgradeType = {
     val unsafe = namespace.getBoolean("unsafe")
-    val updateResult = admin.updateFeatures(featuresToDowngrade.map { case (feature, version) =>
-      if (unsafe) {
-        feature -> new FeatureUpdate(version, UpgradeType.UNSAFE_DOWNGRADE)
-      } else {
-        feature -> new FeatureUpdate(version, UpgradeType.SAFE_DOWNGRADE)
-      }
-    }.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
+    if (unsafe == null || !unsafe) {
+      UpgradeType.SAFE_DOWNGRADE
+    } else {
+      UpgradeType.UNSAFE_DOWNGRADE
+    }
+  }
 
-    handleUpdateFeaturesResponse(updateResult, featuresToDowngrade, dryRun, "downgrade")
+  def handleDowngrade(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
+    handleUpgradeOrDowngrade("downgrade", out, namespace, admin, downgradeType(namespace))
   }
 
-  def handleDisable(namespace: Namespace, admin: Admin): Unit = {
-    val featuresToDisable = parseFeaturesOrRelease(namespace) match {
-      case Features(featureNames) => featureNames
-      case Neither() => throw new TerseFailure("Must specify at least one --feature and --version with downgrade sub-command.")
-      case _ => throw new IllegalStateException()
+  def parseNameAndLevel(input: String): (String, Short) = {
+    val equalsIndex = input.indexOf("=")
+    if (equalsIndex < 0) {
+      throw new TerseFailure(s"Can't parse feature=level string ${input}: equals sign not found.")
+    }
+    val name = input.substring(0, equalsIndex).trim
+    val levelString = input.substring(equalsIndex + 1).trim
+    val level = try {
+      levelString.toShort
+    } catch {
+      case e: Throwable => throw new TerseFailure(s"Can't parse feature=level string ${input}: " +
+        s"unable to parse ${levelString} as a short.")
     }
+    (name, level)
+  }
 
-    val dryRun = namespace.getBoolean("dry_run")
-    val unsafe = namespace.getBoolean("unsafe")
-    val updateResult = admin.updateFeatures(featuresToDisable.map { feature =>
-      if (unsafe) {
-        feature -> new FeatureUpdate(0.toShort, UpgradeType.UNSAFE_DOWNGRADE)
-      } else {
-        feature -> new FeatureUpdate(0.toShort, UpgradeType.SAFE_DOWNGRADE)
+  def handleUpgradeOrDowngrade(
+    op: String,
+    out: PrintStream,
+    namespace: Namespace,
+    admin: Admin,
+    upgradeType: UpgradeType
+  ): Unit = {
+    val updates = new java.util.HashMap[String, FeatureUpdate]()
+    Option(namespace.getString("metadata")).foreach(metadata => {
+      val version = try {
+        MetadataVersion.fromVersionString(metadata)
+      } catch {
+        case e: Throwable => throw new TerseFailure("Unsupported metadata version " + metadata +
+          ". Supported metadata versions are " + metadataVersionsToString(
+          MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, MetadataVersion.latest()))
       }
-    }.toMap.asJava, new UpdateFeaturesOptions().validateOnly(dryRun))
+      updates.put(MetadataVersion.FEATURE_NAME, new FeatureUpdate(version.featureLevel(), upgradeType))
+    })
+    Option(namespace.getList[String]("feature")).foreach(features => {
+      features.forEach(feature => {
+        val (name, level) = parseNameAndLevel(feature)
+        if (updates.put(name, new FeatureUpdate(level, upgradeType)) != null) {
+          throw new TerseFailure(s"Feature ${name} was specified more than once.")
+        }
+      })
+    })
+    update(op, out, admin, updates, namespace.getBoolean("dry-run"))
+  }
 
-    handleUpdateFeaturesResponse(updateResult, featuresToDisable.map {
-      feature => feature -> 0.toShort
-    }.toMap, dryRun, "disable")
+  def handleDisable(out: PrintStream, namespace: Namespace, admin: Admin): Unit = {
+    val upgradeType = downgradeType(namespace)
+    val updates = new java.util.HashMap[String, FeatureUpdate]()
+    Option(namespace.getList[String]("feature")).foreach(features => {
+      features.forEach(name =>
+        if (updates.put(name, new FeatureUpdate(0.toShort, upgradeType)) != null) {
+          throw new TerseFailure(s"Feature ${name} was specified more than once.")
+        })
+      }
+    )
+    update("disable", out, admin, updates, namespace.getBoolean("dry-run"))
   }
 
-  def handleUpdateFeaturesResponse(updateResult: UpdateFeaturesResult,
-                                   updatedFeatures: Map[String, Short],
-                                   dryRun: Boolean,
-                                   op: String): Unit = {
-    val errors = updateResult.values().asScala.map { case (feature, future) =>
+  def update(
+    op: String,
+    out: PrintStream,
+    admin: Admin,
+    updates: java.util.HashMap[String, FeatureUpdate],
+    dryRun: Boolean
+  ): Unit = {
+    if (updates.isEmpty) {
+      throw new TerseFailure(s"You must specify at least one feature to ${op}")
+    }
+    val result =  admin.updateFeatures(updates, new UpdateFeaturesOptions().validateOnly(dryRun))
+    val errors = result.values().asScala.map { case (feature, future) =>
       try {
         future.get()
         feature -> None
@@ -267,67 +288,34 @@ object FeatureCommand {
         case t: Throwable => feature -> Some(t)
       }
     }
-
-    errors.foreach { case (feature, maybeThrowable) =>
+    errors.keySet.toList.sorted.foreach { feature =>
+      val maybeThrowable = errors(feature)
+      val level = updates.get(feature).maxVersionLevel()
       if (maybeThrowable.isDefined) {
-        if (dryRun) {
-          System.out.println(s"Can not $op feature '$feature' to ${updatedFeatures(feature)}. ${maybeThrowable.get.getMessage}")
+        val helper = if (dryRun) {
+          "Can not"
         } else {
-          System.out.println(s"Could not $op feature '$feature' to ${updatedFeatures(feature)}. ${maybeThrowable.get.getMessage}")
+          "Could not"
         }
-      } else {
-        if (dryRun) {
-          System.out.println(s"Feature '$feature' can be ${op}d to ${updatedFeatures(feature)}.")
+        val suffix = if (op.equals("disable")) {
+          s"disable ${feature}"
         } else {
-          System.out.println(s"Feature '$feature' was ${op}d to ${updatedFeatures(feature)}.")
+          s"${op} ${feature} to ${level}"
         }
-      }
-    }
-  }
-
-  sealed trait ReleaseOrFeatures { }
-  case class Neither() extends ReleaseOrFeatures
-  case class Release(release: String) extends ReleaseOrFeatures
-  case class Features(featureNames: Seq[String]) extends ReleaseOrFeatures
-  case class Both() extends ReleaseOrFeatures
-
-  def parseFeaturesOrRelease(namespace: Namespace): ReleaseOrFeatures = {
-    val release = namespace.getString("release")
-    val features = namespace.getList[String]("feature").asScala
-
-    if (release != null && features != null) {
-      Both()
-    } else if (release == null && features == null) {
-      Neither()
-    } else if (release != null) {
-      Release(release)
-    } else {
-      Features(features)
-    }
-  }
-
-  def parseVersions(features: Seq[String], namespace: Namespace): Map[String, Short] = {
-    val versions = namespace.getList[Short]("version").asScala
-    if (versions == null) {
-      throw new TerseFailure("Must specify --version when using --feature argument(s).")
-    }
-    if (versions.size != features.size) {
-      if (versions.size > features.size) {
-        throw new TerseFailure("Too many --version arguments given. For each --feature argument there should be one --version argument.")
+        out.println(s"${helper} ${suffix}. ${maybeThrowable.get.getMessage}")
       } else {
-        throw new TerseFailure("Too many --feature arguments given. For each --feature argument there should be one --version argument.")
+        val verb = if (dryRun) {
+          "can be"
+        } else {
+          "was"
+        }
+        val obj = if (op.equals("disable")) {
+          "disabled."
+        } else {
+          s"${op}d to ${level}."
+        }
+        out.println(s"${feature} ${verb} ${obj}")
       }
     }
-    features.zip(versions).map { case (feature, version) =>
-      feature -> version
-    }.toMap
-  }
-
-  def defaultFeatures(): Map[String, Short] = {
-    Map.empty
-  }
-
-  def featuresForRelease(release: String): Map[String, Short] = {
-    Map.empty
   }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
index ac715d217b..eaf9017463 100644
--- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -17,94 +17,296 @@
 
 package kafka.admin
 
-import kafka.server.{BaseRequestTest, KafkaConfig, KafkaServer}
-import kafka.utils.TestUtils
-import kafka.utils.TestUtils.waitUntilTrue
-import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.tools.TerseFailure
+import kafka.utils.{TestInfoUtils, TestUtils}
+import net.sourceforge.argparse4j.inf.Namespace
+import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.{SAFE_DOWNGRADE, UNSAFE_DOWNGRADE}
+import org.apache.kafka.clients.admin.MockAdminClient
 import org.apache.kafka.common.utils.Utils
-import java.util.Properties
-
-import org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV0
-import org.junit.jupiter.api.Assertions.assertTrue
+import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0, IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.{lang, util}
+import java.util.Collections.{emptyMap, singletonMap}
+import scala.jdk.CollectionConverters._
+
+case class FeatureCommandTestEnv(admin: MockAdminClient = null) extends AutoCloseable {
+  val stream = new ByteArrayOutputStream()
+  val out = new PrintStream(stream)
+
+  override def close(): Unit = {
+    Utils.closeAll(stream, out)
+    Utils.closeQuietly(admin, "admin")
+  }
+
+  def outputWithoutEpoch(): String = {
+    val lines = stream.toString.split(String.format("%n"))
+    lines.map { line =>
+      val pos = line.indexOf("Epoch: ")
+      if (pos > 0) {
+        line.substring(0, pos)
+      } else {
+        line
+      }
+    }.mkString(String.format("%n"))
+  }
+}
+
+class FeatureCommandTest extends IntegrationTestHarness {
+  override def brokerCount: Int = 1
+
+  override protected def metadataVersion: MetadataVersion = IBP_3_3_IV1
+
+  serverConfig.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, metadataVersion.toString)
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testDescribeWithZk(quorum: String): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe"), env.out)
+      assertEquals("", env.outputWithoutEpoch())
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft"))
+  def testDescribeWithKRaft(quorum: String): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe"), env.out)
+      assertEquals(String.format(
+        "Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
+          "SupportedMaxVersion: 3.3-IV3\tFinalizedVersionLevel: 3.3-IV1\t"),
+            env.outputWithoutEpoch())
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testUpgradeMetadataVersionWithZk(quorum: String): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
+        "upgrade", "--metadata", "3.3-IV2"), env.out)
+      assertEquals("Could not upgrade metadata.version to 6. Could not apply finalized feature " +
+        "update because the provided feature is not supported.", env.outputWithoutEpoch())
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft"))
+  def testUpgradeMetadataVersionWithKraft(quorum: String): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
+        "upgrade", "--feature", "metadata.version=5"), env.out)
+      assertEquals("metadata.version was upgraded to 5.", env.outputWithoutEpoch())
+    }
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
+        "upgrade", "--metadata", "3.3-IV2"), env.out)
+      assertEquals("metadata.version was upgraded to 6.", env.outputWithoutEpoch())
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testDowngradeMetadataVersionWithZk(quorum: String): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
+        "disable", "--feature", "metadata.version"), env.out)
+      assertEquals("Could not disable metadata.version. Can not delete non-existing finalized feature.",
+        env.outputWithoutEpoch())
+    }
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
+        "downgrade", "--metadata", "3.3-IV0"), env.out)
+      assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
+        "update because the provided feature is not supported.", env.outputWithoutEpoch())
+    }
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
+        "downgrade", "--unsafe", "--metadata", "3.3-IV0"), env.out)
+      assertEquals("Could not downgrade metadata.version to 4. Could not apply finalized feature " +
+        "update because the provided feature is not supported.", env.outputWithoutEpoch())
+    }
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft"))
+  def testDowngradeMetadataVersionWithKRaft(quorum: String): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
+        "disable", "--feature", "metadata.version"), env.out)
+      assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
+        "metadata.version. Local controller 1000 only supports versions 1-7", env.outputWithoutEpoch())
+    }
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
+        "downgrade", "--metadata", "3.3-IV0"), env.out)
+      assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
+        "Refusing to perform the requested downgrade because it might delete metadata information. " +
+        "Retry using UNSAFE_DOWNGRADE if you want to force the downgrade to proceed.", env.outputWithoutEpoch())
+    }
+    TestUtils.resource(FeatureCommandTestEnv()) { env =>
+      FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
+        "downgrade", "--unsafe", "--metadata", "3.3-IV0"), env.out)
+      assertEquals("metadata.version was downgraded to 4.", env.outputWithoutEpoch())
+    }
+  }
+}
+
+class FeatureCommandUnitTest {
+  @Test
+  def testLevelToString(): Unit = {
+    assertEquals("5", FeatureCommand.levelToString("foo.bar", 5.toShort))
+    assertEquals("3.3-IV0",
+      FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, IBP_3_3_IV0.featureLevel()))
+  }
+
+  @Test
+  def testMetadataVersionsToString(): Unit = {
+    assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3",
+      FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_3_IV3))
+  }
+
+  @Test
+  def testdowngradeType(): Unit = {
+    assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(
+      new Namespace(singletonMap("unsafe", java.lang.Boolean.valueOf(false)))))
+    assertEquals(UNSAFE_DOWNGRADE, FeatureCommand.downgradeType(
+      new Namespace(singletonMap("unsafe", java.lang.Boolean.valueOf(true)))))
+    assertEquals(SAFE_DOWNGRADE, FeatureCommand.downgradeType(new Namespace(emptyMap())))
+  }
+
+  @Test
+  def testParseNameAndLevel(): Unit = {
+    assertEquals(("foo.bar", 5.toShort), FeatureCommand.parseNameAndLevel("foo.bar=5"))
+    assertEquals(("quux", 0.toShort), FeatureCommand.parseNameAndLevel(" quux=0"))
+    assertEquals("Can't parse feature=level string baaz: equals sign not found.",
+      assertThrows(classOf[TerseFailure],
+        () => FeatureCommand.parseNameAndLevel("baaz")).getMessage)
+    assertEquals("Can't parse feature=level string w=tf: unable to parse tf as a short.",
+      assertThrows(classOf[TerseFailure],
+        () => FeatureCommand.parseNameAndLevel("w=tf")).getMessage)
+  }
+
+  def buildAdminClient1(): MockAdminClient = {
+    new MockAdminClient.Builder().
+      minSupportedFeatureLevels(Map(
+        MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV0.featureLevel()),
+        "foo.bar" -> lang.Short.valueOf(0.toShort)
+      ).asJava).
+      featureLevels(Map(
+        MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV2.featureLevel()),
+        "foo.bar" -> lang.Short.valueOf(5.toShort)
+      ).asJava).
+      maxSupportedFeatureLevels(Map(
+        MetadataVersion.FEATURE_NAME -> lang.Short.valueOf(IBP_3_3_IV3.featureLevel()),
+        "foo.bar" -> lang.Short.valueOf(10.toShort)
+      ).asJava).
+      build()
+  }
+
+  @Test
+  def testHandleDescribe(): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
+      FeatureCommand.handleDescribe(env.out, env.admin)
+      assertEquals(String.format(
+        "Feature: foo.bar\tSupportedMinVersion: 0\tSupportedMaxVersion: 10\tFinalizedVersionLevel: 5\tEpoch: 123%n" +
+        "Feature: metadata.version\tSupportedMinVersion: 3.3-IV0\tSupportedMaxVersion: 3.3-IV3\tFinalizedVersionLevel: 3.3-IV2\tEpoch: 123%n"),
+        env.stream.toString)
+    }
+  }
+
+  @Test
+  def testHandleUpgrade(): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
+      FeatureCommand.handleUpgrade(env.out, new Namespace(Map(
+        "metadata" -> "3.3-IV1",
+        "feature" -> util.Arrays.asList("foo.bar=6")
+      ).asJava), env.admin)
+      assertEquals(String.format(
+        "foo.bar was upgraded to 6.%n" +
+        "Could not upgrade metadata.version to 5. Can't upgrade to lower version.%n"),
+        env.stream.toString)
+    }
+  }
+
+  @Test
+  def testHandleUpgradeDryRun(): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
+      FeatureCommand.handleUpgrade(env.out, new Namespace(Map(
+        "metadata" -> "3.3-IV1",
+        "feature" -> util.Arrays.asList("foo.bar=6"),
+        "dry-run" -> java.lang.Boolean.valueOf(true)
+      ).asJava), env.admin)
+      assertEquals(String.format(
+        "foo.bar can be upgraded to 6.%n" +
+        "Can not upgrade metadata.version to 5. Can't upgrade to lower version.%n"),
+        env.stream.toString)
+    }
+  }
+
+  @Test
+  def testHandleDowngrade(): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
+      FeatureCommand.handleDowngrade(env.out, new Namespace(Map(
+        "metadata" -> "3.3-IV3",
+        "feature" -> util.Arrays.asList("foo.bar=1")
+      ).asJava), env.admin)
+      assertEquals(String.format(
+        "foo.bar was downgraded to 1.%n" +
+        "Could not downgrade metadata.version to 7. Can't downgrade to newer version.%n"),
+        env.stream.toString)
+    }
+  }
+
+  @Test
+  def testHandleDowngradeDryRun(): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
+      FeatureCommand.handleDowngrade(env.out, new Namespace(Map(
+        "metadata" -> "3.3-IV3",
+        "feature" -> util.Arrays.asList("foo.bar=1"),
+        "dry-run" -> java.lang.Boolean.valueOf(true)
+      ).asJava), env.admin)
+      assertEquals(String.format(
+        "foo.bar can be downgraded to 1.%n" +
+        "Can not downgrade metadata.version to 7. Can't downgrade to newer version.%n"),
+        env.stream.toString)
+    }
+  }
+
+  @Test
+  def testHandleDisable(): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
+      FeatureCommand.handleDisable(env.out, new Namespace(Map[String, AnyRef](
+        "feature" -> util.Arrays.asList("foo.bar", "metadata.version", "quux")
+      ).asJava), env.admin)
+      assertEquals(String.format(
+        "foo.bar was disabled.%n" +
+        "Could not disable metadata.version. Can't downgrade below 4%n" +
+        "quux was disabled.%n"),
+        env.stream.toString)
+    }
+  }
 
-class FeatureCommandTest extends BaseRequestTest {
-  override def brokerCount: Int = 3
-
-  override def brokerPropertyOverrides(props: Properties): Unit = {
-    props.put(KafkaConfig.InterBrokerProtocolVersionProp, IBP_2_7_IV0.toString)
-  }
-
-  private val defaultSupportedFeatures: Features[SupportedVersionRange] =
-    Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
-                                           Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5))))
-
-  private def updateSupportedFeatures(features: Features[SupportedVersionRange],
-                                      targetServers: Set[KafkaServer]): Unit = {
-    targetServers.foreach(s => {
-      s.brokerFeatures.setSupportedFeatures(features)
-      s.zkClient.updateBrokerInfo(s.createBrokerInfo)
-    })
-
-    // Wait until updates to all BrokerZNode supported features propagate to the controller.
-    val brokerIds = targetServers.map(s => s.config.brokerId)
-    waitUntilTrue(
-      () => servers.exists(s => {
-        if (s.kafkaController.isActive) {
-          s.kafkaController.controllerContext.liveOrShuttingDownBrokers
-            .filter(b => brokerIds.contains(b.id))
-            .forall(b => {
-              b.features.equals(features)
-            })
-        } else {
-          false
-        }
-      }),
-      "Controller did not get broker updates")
-  }
-
-  private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = {
-    updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
-  }
-
-  /**
-   * Tests if the FeatureApis#describeFeatures API works as expected when describing features before and
-   * after upgrading features.
-   */
   @Test
-  def testDescribeFeaturesSuccess(): Unit = {
-    updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
-
-    val initialDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
-    val expectedInitialDescribeOutputs = Seq(
-      "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: -",
-      "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: -"
-    )
-
-    expectedInitialDescribeOutputs.foreach { expectedOutput =>
-      assertTrue(initialDescribeOutput.contains(expectedOutput))
-    }
-
-    FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "upgrade",
-      "--feature", "feature_1", "--version", "3", "--feature", "feature_2", "--version", "5"))
-    val upgradeDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
-    val expectedUpgradeDescribeOutput = Seq(
-      "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: 3",
-      "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: 5"
-    )
-    expectedUpgradeDescribeOutput.foreach { expectedOutput =>
-      assertTrue(upgradeDescribeOutput.contains(expectedOutput))
-    }
-
-    FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "downgrade",
-      "--feature", "feature_1", "--version", "2", "--feature", "feature_2", "--version", "2"))
-    val downgradeDescribeOutput = TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "describe")))
-    val expectedFinalDescribeOutput = Seq(
-      "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedVersionLevel: 2",
-      "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedVersionLevel: 2"
-    )
-    expectedFinalDescribeOutput.foreach { expectedOutput =>
-      assertTrue(downgradeDescribeOutput.contains(expectedOutput))
+  def testHandleDisableDryRun(): Unit = {
+    TestUtils.resource(FeatureCommandTestEnv(buildAdminClient1())) { env =>
+      FeatureCommand.handleDisable(env.out, new Namespace(Map[String, AnyRef](
+        "feature" -> util.Arrays.asList("foo.bar", "metadata.version", "quux"),
+        "dry-run" -> java.lang.Boolean.valueOf(true)
+      ).asJava), env.admin)
+      assertEquals(String.format(
+        "foo.bar can be disabled.%n" +
+        "Can not disable metadata.version. Can't downgrade below 4%n" +
+        "quux can be disabled.%n"),
+        env.stream.toString)
     }
   }
 }
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 7d1e71efdb..86583342fd 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -61,8 +61,27 @@
     </li>
 </ol>
 
+<h4><a id="upgrade_3_3_0" href="#upgrade_3_3_0">Upgrading a KRaft-based cluster to 3.3.0 from any version 3.0.x through 3.2.x</a></h4>
+
+<p><b>If you are upgrading from a version prior to 3.3.0, please see the note below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.</b></p>
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li>Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
+        brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
+    </li>
+    <li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
+        <code>
+        ./bin/kafka-features.sh upgrade --metadata 3.3
+        </code>
+    </li>
+    <li>Note that the cluster metadata version cannot be downgraded to a pre-production 3.0.x, 3.1.x, or 3.2.x version once it has been upgraded. However, it is possible to downgrade to production versions such as 3.3-IV0, 3.3-IV1, etc.</li>
+</ol>
+
 <h5><a id="upgrade_330_notable" href="#upgrade_330_notable">Notable changes in 3.3.0</a></h5>
     <ul>
+        <li>There is now a slightly different upgrade process for KRaft clusters than for ZK-based clusters, as described above.</li>
         <li>Introduced a new API <code>addMetricIfAbsent</code> to <code>Metrics</code> which would create a new Metric if not existing or return the same metric
             if already registered. Note that this behaviour is different from <code>addMetric</code> API which throws an <code>IllegalArgumentException</code> when
             trying to create an already existing metric. (See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-843%3A+Adding+addMetricIfAbsent+method+to+Metrics">KIP-843</a>
@@ -2035,3 +2054,4 @@ Release 0.7 is incompatible with newer releases. Major changes were made to the
 </script>
 
 <div class="p-upgrade"></div>
+</html>
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 041578b8d0..d3fed586b1 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -850,6 +850,19 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         if len(self.pids(node)) == 0:
             raise Exception("No process ids recorded on node %s" % node.account.hostname)
 
+    def upgrade_metadata_version(self, new_version):
+        self.run_features_command("upgrade", new_version)
+
+    def downgrade_metadata_version(self, new_version):
+        self.run_features_command("downgrade", new_version)
+
+    def run_features_command(self, op, new_version):
+        cmd = self.path.script("kafka-features.sh ")
+        cmd += "--bootstrap-server %s " % self.bootstrap_servers()
+        cmd += "%s --metadata %s" % (op, new_version)
+        self.logger.info("Running %s command...\n%s" % (op, cmd))
+        self.nodes[0].account.ssh(cmd)
+
     def pids(self, node):
         """Return process ids associated with running processes on the given node."""
         try:
diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py b/tests/kafkatest/tests/core/kraft_upgrade_test.py
new file mode 100644
index 0000000000..cf9ea47e4f
--- /dev/null
+++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.quorum import remote_kraft, colocated_kraft
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import LATEST_3_0, LATEST_3_1, LATEST_3_2, DEV_BRANCH, \
+    KafkaVersion, LATEST_METADATA_VERSION
+
+#
+# Test upgrading between different KRaft versions.
+#
+# Note that the earliest supported KRaft version is 3.0, not 0.8 as it is for
+# ZK mode. The upgrade process is also somewhat different for KRaft because we
+# use metadata.version instead of inter.broker.protocol.
+#
+class TestKRaftUpgrade(ProduceConsumeValidateTest):
+
+    def __init__(self, test_context):
+        super(TestKRaftUpgrade, self).__init__(test_context=test_context)
+        self.may_truncate_acked_records = False
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def wait_until_rejoin(self):
+        for partition in range(0, self.partitions):
+            wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60,
+                    backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
+
+    def perform_version_change(self, from_kafka_version):
+        self.logger.info("Performing rolling upgrade.")
+        for node in self.kafka.controller_quorum.nodes:
+            self.logger.info("Stopping controller node %s" % node.account.hostname)
+            self.kafka.controller_quorum.stop_node(node)
+            node.version = DEV_BRANCH
+            self.logger.info("Restarting controller node %s" % node.account.hostname)
+            self.kafka.controller_quorum.start_node(node)
+            self.wait_until_rejoin()
+            self.logger.info("Successfully restarted controller node %s" % node.account.hostname)
+        for node in self.kafka.nodes:
+            self.logger.info("Stopping broker node %s" % node.account.hostname)
+            self.kafka.stop_node(node)
+            node.version = DEV_BRANCH
+            self.logger.info("Restarting broker node %s" % node.account.hostname)
+            self.kafka.start_node(node)
+            self.wait_until_rejoin()
+            self.logger.info("Successfully restarted broker node %s" % node.account.hostname)
+        self.logger.info("Changing metadata.version to %s" % LATEST_METADATA_VERSION)
+        self.kafka.upgrade_metadata_version(LATEST_METADATA_VERSION)
+
+    def run_upgrade(self, from_kafka_version):
+        """Test upgrade of Kafka broker cluster from various versions to the current version
+
+        from_kafka_version is a Kafka version to upgrade from.
+
+        - Start 3 node broker cluster on version 'from_kafka_version'.
+        - Start producer and consumer in the background.
+        - Perform rolling upgrade.
+        - Upgrade cluster to the latest metadata.version.
+        - Finally, validate that every message acked by the producer was consumed by the consumer.
+        """
+        fromKafkaVersion = KafkaVersion(from_kafka_version)
+        self.kafka = KafkaService(self.test_context,
+                                  num_nodes=3,
+                                  zk=None,
+                                  version=fromKafkaVersion,
+                                  topics={self.topic: {"partitions": self.partitions,
+                                                       "replication-factor": self.replication_factor,
+                                                       'configs': {"min.insync.replicas": 2}}})
+        self.kafka.start()
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           compression_types=["none"],
+                                           version=KafkaVersion(from_kafka_version))
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, new_consumer=True, consumer_timeout_ms=30000,
+                                        message_validator=is_int, version=KafkaVersion(from_kafka_version))
+        self.run_produce_consume_validate(core_test_action=lambda: self.perform_version_change(from_kafka_version))
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.check_protocol_errors(self)
+
+    @cluster(num_nodes=5)
+    @parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=colocated_kraft)
+    @parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=colocated_kraft)
+    def test_colocated_upgrade(self, from_kafka_version, metadata_quorum):
+        self.run_upgrade(from_kafka_version)
+
+    @cluster(num_nodes=8)
+    @parametrize(from_kafka_version=str(LATEST_3_1), metadata_quorum=remote_kraft)
+    @parametrize(from_kafka_version=str(LATEST_3_2), metadata_quorum=remote_kraft)
+    def test_non_colocated_upgrade(self, from_kafka_version, metadata_quorum):
+        self.run_upgrade(from_kafka_version)
+
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 64f0bf2c53..29f1b4e841 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -121,6 +121,8 @@ def get_version(node=None):
 DEV_BRANCH = KafkaVersion("dev")
 DEV_VERSION = KafkaVersion("3.3.0-SNAPSHOT")
 
+LATEST_METADATA_VERSION = "3.3"
+
 # 0.8.2.x versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")
 V_0_8_2_2 = KafkaVersion("0.8.2.2")