You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/15 00:09:54 UTC

[GitHub] [kafka] junrao commented on a change in pull request #9409: KAFKA-10599: Implement basic CLI tool for feature versioning system

junrao commented on a change in pull request #9409:
URL: https://github.com/apache/kafka/pull/9409#discussion_r505019265



##########
File path: core/src/main/scala/kafka/admin/FeatureCommand.scala
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+
+import joptsimple.OptionSpec
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+    val opts = new FeatureCommandOptions(args)
+    val featureApis = new FeatureApis(opts)
+    var exitCode = 0
+    try {
+      featureApis.execute()
+    } catch {
+      case e: IllegalArgumentException =>
+        printException(e)
+        opts.parser.printHelpOn(System.err)
+        exitCode = 1
+      case _: UpdateFeaturesException =>
+        exitCode = 1
+      case e: Throwable =>
+        printException(e)
+        exitCode = 1
+    } finally {
+      featureApis.close()
+      Exit.exit(exitCode)
+    }
+  }
+
+  private def printException(exception: Throwable): Unit = {
+    System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge the Admin client feature APIs with the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(var opts: FeatureCommandOptions) {
+  private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
+  private val adminClient = createAdminClient()
+
+  private def pad(op: String): String = {
+    f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // For testing only.
+  private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
+    opts = newOpts
+  }
+
+  private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
+    val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
+    adminClient.describeFeatures(options).featureMetadata().get()
+  }
+
+  /**
+   * Describes the supported and finalized features. If the --from-controller CLI option
+   * is provided, then the request is issued only to the controller, otherwise the request is issued
+   * to any of the provided bootstrap servers.
+   */
+  def describeFeatures(): Unit = {
+    val result = describeFeatures(opts.hasFromControllerOption)
+    val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
+
+    features.toList.sorted.foreach {
+      feature =>
+        val output = new StringBuilder()
+        output.append(s"Feature: $feature")
+
+        val (supportedMinVersion, supportedMaxVersion) = {
+          val supportedVersionRange = result.supportedFeatures.get(feature)
+          if (supportedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
+          }
+        }
+        output.append(s"\tSupportedMinVersion: $supportedMinVersion")
+        output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+
+        val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
+          val finalizedVersionRange = result.finalizedFeatures.get(feature)
+          if (finalizedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
+          }
+        }
+        output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
+        output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
+
+        val epoch = {
+          if (result.finalizedFeaturesEpoch.isPresent) {
+            result.finalizedFeaturesEpoch.get.toString
+          } else {
+            "-"
+          }
+        }
+        output.append(s"\tEpoch: $epoch")
+
+        println(output)
+    }
+  }
+
+  /**
+   * Upgrades all features known to this tool to their highest max version levels. The method may
+   * add new finalized features if they were not finalized previously, but it does not delete
+   * any existing finalized feature. The results of the feature updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def upgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val updates = supportedFeatures.features.asScala.map {
+      case (feature, targetVersionRange) =>
+        val existingVersionRange = existingFinalizedFeatures.get(feature)
+        if (existingVersionRange == null) {
+          val updateStr =
+            addOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: -" +
+            s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+          (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+        } else {
+          if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              upgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Downgrades existing finalized features to the highest max version levels known to this tool.
+   * The method may delete existing finalized features if they are no longer seen to be supported,
+   * but it does not add a feature that was not finalized previously. The results of the feature
+   * updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def downgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val supportedFeaturesMap = supportedFeatures.features
+    val updates = existingFinalizedFeatures.asScala.map {
+      case (feature, existingVersionRange) =>
+        val targetVersionRange = supportedFeaturesMap.get(feature)
+        if (targetVersionRange == null) {
+          val updateStr =
+            deleteOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+            s"\tNewFinalizedMaxVersion: -"
+          (feature, (updateStr, new FeatureUpdate(0, true)))
+        } else {
+          if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              downgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, true)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Applies the provided feature updates. If the --dry-run CLI option is provided, the method
+   * only prints the expected feature updates to STDOUT without applying them.
+   *
+   * @param updates the feature updates to be applied via the admin client
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
+    if (opts.hasDryRunOption) {
+      println("Expected feature updates:")
+      println(ListMap(updates.toSeq.sortBy(_._1):_*)
+                .map { case(_, (updateStr, _)) => updateStr}
+                .mkString("\n"))
+    } else {
+      val result = adminClient.updateFeatures(
+        updates.map { case(feature, (_, update)) => (feature, update)}.asJava,
+        new UpdateFeaturesOptions())
+      val failures = ListMap(result.values.asScala.toSeq.sortBy(_._1):_*).map {
+        case (feature, updateFuture) =>
+          val (updateStr, _) = updates(feature)
+          try {
+            updateFuture.get
+            println(updateStr + "\tResult: OK")
+            0
+          } catch {
+            case e: Exception =>
+              println(updateStr + "\tResult: FAILED due to " + e.getMessage)
+              1
+          }
+      }.sum
+      if (failures > 0) {
+        throw new UpdateFeaturesException(s"$failures feature updates failed!")
+      }
+    }
+  }
+
+  def execute(): Unit = {
+    if (opts.hasDescribeOption) {
+      describeFeatures()
+    } else if (opts.hasUpgradeAllOption) {
+      upgradeAllFeatures()
+    } else if (opts.hasDowngradeAllOption) {
+      downgradeAllFeatures()
+    } else {
+      throw new IllegalStateException("Unexpected state: no CLI command could be executed.")
+    }
+  }
+
+  def close(): Unit = {
+    adminClient.close()
+  }
+
+  private def createAdminClient(): Admin = {
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServers)
+    Admin.create(props)
+  }
+}
+
+class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+  private val bootstrapServerOpt =
+    parser.accepts("bootstrap-server", "REQUIRED: The Kafka server(s) to connect to.")

Review comment:
       Perhaps we can describe the format is host:port list.

##########
File path: core/src/main/scala/kafka/admin/FeatureCommand.scala
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+
+import joptsimple.OptionSpec
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+    val opts = new FeatureCommandOptions(args)
+    val featureApis = new FeatureApis(opts)
+    var exitCode = 0
+    try {
+      featureApis.execute()
+    } catch {
+      case e: IllegalArgumentException =>
+        printException(e)
+        opts.parser.printHelpOn(System.err)
+        exitCode = 1
+      case _: UpdateFeaturesException =>
+        exitCode = 1
+      case e: Throwable =>
+        printException(e)
+        exitCode = 1
+    } finally {
+      featureApis.close()
+      Exit.exit(exitCode)
+    }
+  }
+
+  private def printException(exception: Throwable): Unit = {
+    System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge the Admin client feature APIs with the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(var opts: FeatureCommandOptions) {
+  private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
+  private val adminClient = createAdminClient()
+
+  private def pad(op: String): String = {
+    f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // For testing only.
+  private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
+    opts = newOpts
+  }
+
+  private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
+    val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
+    adminClient.describeFeatures(options).featureMetadata().get()
+  }
+
+  /**
+   * Describes the supported and finalized features. If the --from-controller CLI option
+   * is provided, then the request is issued only to the controller, otherwise the request is issued
+   * to any of the provided bootstrap servers.
+   */
+  def describeFeatures(): Unit = {
+    val result = describeFeatures(opts.hasFromControllerOption)
+    val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
+
+    features.toList.sorted.foreach {
+      feature =>
+        val output = new StringBuilder()
+        output.append(s"Feature: $feature")
+
+        val (supportedMinVersion, supportedMaxVersion) = {
+          val supportedVersionRange = result.supportedFeatures.get(feature)
+          if (supportedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
+          }
+        }
+        output.append(s"\tSupportedMinVersion: $supportedMinVersion")
+        output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+
+        val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
+          val finalizedVersionRange = result.finalizedFeatures.get(feature)
+          if (finalizedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
+          }
+        }
+        output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
+        output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
+
+        val epoch = {
+          if (result.finalizedFeaturesEpoch.isPresent) {
+            result.finalizedFeaturesEpoch.get.toString
+          } else {
+            "-"
+          }
+        }
+        output.append(s"\tEpoch: $epoch")
+
+        println(output)
+    }
+  }
+
+  /**
+   * Upgrades all features known to this tool to their highest max version levels. The method may
+   * add new finalized features if they were not finalized previously, but it does not delete
+   * any existing finalized feature. The results of the feature updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def upgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val updates = supportedFeatures.features.asScala.map {
+      case (feature, targetVersionRange) =>
+        val existingVersionRange = existingFinalizedFeatures.get(feature)
+        if (existingVersionRange == null) {
+          val updateStr =
+            addOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: -" +
+            s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+          (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+        } else {
+          if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              upgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+          } else {
+            (feature, null)

Review comment:
       Perhaps we could use Option to avoid null?

##########
File path: core/src/main/scala/kafka/admin/FeatureCommand.scala
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+
+import joptsimple.OptionSpec
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+    val opts = new FeatureCommandOptions(args)
+    val featureApis = new FeatureApis(opts)
+    var exitCode = 0
+    try {
+      featureApis.execute()
+    } catch {
+      case e: IllegalArgumentException =>
+        printException(e)
+        opts.parser.printHelpOn(System.err)
+        exitCode = 1
+      case _: UpdateFeaturesException =>
+        exitCode = 1
+      case e: Throwable =>
+        printException(e)
+        exitCode = 1
+    } finally {
+      featureApis.close()
+      Exit.exit(exitCode)
+    }
+  }
+
+  private def printException(exception: Throwable): Unit = {
+    System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge the Admin client feature APIs with the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(var opts: FeatureCommandOptions) {
+  private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
+  private val adminClient = createAdminClient()
+
+  private def pad(op: String): String = {
+    f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // For testing only.
+  private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
+    opts = newOpts
+  }
+
+  private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
+    val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
+    adminClient.describeFeatures(options).featureMetadata().get()
+  }
+
+  /**
+   * Describes the supported and finalized features. If the --from-controller CLI option
+   * is provided, then the request is issued only to the controller, otherwise the request is issued
+   * to any of the provided bootstrap servers.
+   */
+  def describeFeatures(): Unit = {
+    val result = describeFeatures(opts.hasFromControllerOption)
+    val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
+
+    features.toList.sorted.foreach {
+      feature =>
+        val output = new StringBuilder()
+        output.append(s"Feature: $feature")
+
+        val (supportedMinVersion, supportedMaxVersion) = {
+          val supportedVersionRange = result.supportedFeatures.get(feature)
+          if (supportedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
+          }
+        }
+        output.append(s"\tSupportedMinVersion: $supportedMinVersion")
+        output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+
+        val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
+          val finalizedVersionRange = result.finalizedFeatures.get(feature)
+          if (finalizedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
+          }
+        }
+        output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
+        output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
+
+        val epoch = {
+          if (result.finalizedFeaturesEpoch.isPresent) {
+            result.finalizedFeaturesEpoch.get.toString
+          } else {
+            "-"
+          }
+        }
+        output.append(s"\tEpoch: $epoch")
+
+        println(output)
+    }
+  }
+
+  /**
+   * Upgrades all features known to this tool to their highest max version levels. The method may
+   * add new finalized features if they were not finalized previously, but it does not delete
+   * any existing finalized feature. The results of the feature updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def upgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val updates = supportedFeatures.features.asScala.map {
+      case (feature, targetVersionRange) =>
+        val existingVersionRange = existingFinalizedFeatures.get(feature)
+        if (existingVersionRange == null) {
+          val updateStr =
+            addOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: -" +
+            s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+          (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+        } else {
+          if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              upgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Downgrades existing finalized features to the highest max version levels known to this tool.
+   * The method may delete existing finalized features if they are no longer seen to be supported,
+   * but it does not add a feature that was not finalized previously. The results of the feature
+   * updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def downgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val supportedFeaturesMap = supportedFeatures.features
+    val updates = existingFinalizedFeatures.asScala.map {
+      case (feature, existingVersionRange) =>
+        val targetVersionRange = supportedFeaturesMap.get(feature)
+        if (targetVersionRange == null) {
+          val updateStr =
+            deleteOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+            s"\tNewFinalizedMaxVersion: -"
+          (feature, (updateStr, new FeatureUpdate(0, true)))
+        } else {
+          if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              downgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, true)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Applies the provided feature updates. If the --dry-run CLI option is provided, the method
+   * only prints the expected feature updates to STDOUT without applying them.
+   *
+   * @param updates the feature updates to be applied via the admin client
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
+    if (opts.hasDryRunOption) {
+      println("Expected feature updates:")
+      println(ListMap(updates.toSeq.sortBy(_._1):_*)

Review comment:
       Could we use case to avoid unnamed reference _._1?

##########
File path: core/src/main/scala/kafka/admin/FeatureCommand.scala
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+
+import joptsimple.OptionSpec
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+    val opts = new FeatureCommandOptions(args)
+    val featureApis = new FeatureApis(opts)
+    var exitCode = 0
+    try {
+      featureApis.execute()
+    } catch {
+      case e: IllegalArgumentException =>
+        printException(e)
+        opts.parser.printHelpOn(System.err)
+        exitCode = 1
+      case _: UpdateFeaturesException =>
+        exitCode = 1
+      case e: Throwable =>
+        printException(e)
+        exitCode = 1
+    } finally {
+      featureApis.close()
+      Exit.exit(exitCode)
+    }
+  }
+
+  private def printException(exception: Throwable): Unit = {
+    System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge the Admin client feature APIs with the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(var opts: FeatureCommandOptions) {
+  private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
+  private val adminClient = createAdminClient()
+
+  private def pad(op: String): String = {
+    f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // For testing only.
+  private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
+    opts = newOpts
+  }
+
+  private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
+    val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
+    adminClient.describeFeatures(options).featureMetadata().get()
+  }
+
+  /**
+   * Describes the supported and finalized features. If the --from-controller CLI option
+   * is provided, then the request is issued only to the controller, otherwise the request is issued
+   * to any of the provided bootstrap servers.
+   */
+  def describeFeatures(): Unit = {
+    val result = describeFeatures(opts.hasFromControllerOption)
+    val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
+
+    features.toList.sorted.foreach {
+      feature =>
+        val output = new StringBuilder()
+        output.append(s"Feature: $feature")
+
+        val (supportedMinVersion, supportedMaxVersion) = {
+          val supportedVersionRange = result.supportedFeatures.get(feature)
+          if (supportedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
+          }
+        }
+        output.append(s"\tSupportedMinVersion: $supportedMinVersion")
+        output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+
+        val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
+          val finalizedVersionRange = result.finalizedFeatures.get(feature)
+          if (finalizedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
+          }
+        }
+        output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
+        output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
+
+        val epoch = {
+          if (result.finalizedFeaturesEpoch.isPresent) {
+            result.finalizedFeaturesEpoch.get.toString
+          } else {
+            "-"
+          }
+        }
+        output.append(s"\tEpoch: $epoch")
+
+        println(output)
+    }
+  }
+
+  /**
+   * Upgrades all features known to this tool to their highest max version levels. The method may
+   * add new finalized features if they were not finalized previously, but it does not delete
+   * any existing finalized feature. The results of the feature updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def upgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val updates = supportedFeatures.features.asScala.map {
+      case (feature, targetVersionRange) =>
+        val existingVersionRange = existingFinalizedFeatures.get(feature)
+        if (existingVersionRange == null) {
+          val updateStr =
+            addOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: -" +
+            s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+          (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+        } else {
+          if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              upgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Downgrades existing finalized features to the highest max version levels known to this tool.
+   * The method may delete existing finalized features if they are no longer seen to be supported,
+   * but it does not add a feature that was not finalized previously. The results of the feature
+   * updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def downgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val supportedFeaturesMap = supportedFeatures.features
+    val updates = existingFinalizedFeatures.asScala.map {
+      case (feature, existingVersionRange) =>
+        val targetVersionRange = supportedFeaturesMap.get(feature)
+        if (targetVersionRange == null) {
+          val updateStr =
+            deleteOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+            s"\tNewFinalizedMaxVersion: -"
+          (feature, (updateStr, new FeatureUpdate(0, true)))
+        } else {
+          if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              downgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, true)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Applies the provided feature updates. If the --dry-run CLI option is provided, the method
+   * only prints the expected feature updates to STDOUT without applying them.
+   *
+   * @param updates the feature updates to be applied via the admin client
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
+    if (opts.hasDryRunOption) {
+      println("Expected feature updates:")
+      println(ListMap(updates.toSeq.sortBy(_._1):_*)
+                .map { case(_, (updateStr, _)) => updateStr}
+                .mkString("\n"))
+    } else {
+      val result = adminClient.updateFeatures(
+        updates.map { case(feature, (_, update)) => (feature, update)}.asJava,
+        new UpdateFeaturesOptions())
+      val failures = ListMap(result.values.asScala.toSeq.sortBy(_._1):_*).map {
+        case (feature, updateFuture) =>
+          val (updateStr, _) = updates(feature)
+          try {
+            updateFuture.get
+            println(updateStr + "\tResult: OK")
+            0
+          } catch {
+            case e: Exception =>
+              println(updateStr + "\tResult: FAILED due to " + e.getMessage)

Review comment:
       Do we need to get the cause from ExecutionException thrown from Future?

##########
File path: core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
##########
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.server.{BaseRequestTest, KafkaConfig, KafkaServer}
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Test
+import org.scalatest.Assertions.intercept
+
+class FeatureCommandTest extends BaseRequestTest {
+  override def brokerCount: Int = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {

Review comment:
       Could this be a val?

##########
File path: core/src/main/scala/kafka/admin/FeatureCommand.scala
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+
+import joptsimple.OptionSpec
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+    val opts = new FeatureCommandOptions(args)
+    val featureApis = new FeatureApis(opts)
+    var exitCode = 0
+    try {
+      featureApis.execute()
+    } catch {
+      case e: IllegalArgumentException =>
+        printException(e)
+        opts.parser.printHelpOn(System.err)
+        exitCode = 1
+      case _: UpdateFeaturesException =>
+        exitCode = 1
+      case e: Throwable =>
+        printException(e)
+        exitCode = 1
+    } finally {
+      featureApis.close()
+      Exit.exit(exitCode)
+    }
+  }
+
+  private def printException(exception: Throwable): Unit = {
+    System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge the Admin client feature APIs with the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(var opts: FeatureCommandOptions) {
+  private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
+  private val adminClient = createAdminClient()
+
+  private def pad(op: String): String = {
+    f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // For testing only.
+  private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
+    opts = newOpts
+  }
+
+  private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
+    val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
+    adminClient.describeFeatures(options).featureMetadata().get()
+  }
+
+  /**
+   * Describes the supported and finalized features. If the --from-controller CLI option
+   * is provided, then the request is issued only to the controller, otherwise the request is issued
+   * to any of the provided bootstrap servers.
+   */
+  def describeFeatures(): Unit = {
+    val result = describeFeatures(opts.hasFromControllerOption)
+    val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
+
+    features.toList.sorted.foreach {
+      feature =>
+        val output = new StringBuilder()
+        output.append(s"Feature: $feature")
+
+        val (supportedMinVersion, supportedMaxVersion) = {
+          val supportedVersionRange = result.supportedFeatures.get(feature)
+          if (supportedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
+          }
+        }
+        output.append(s"\tSupportedMinVersion: $supportedMinVersion")
+        output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+
+        val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
+          val finalizedVersionRange = result.finalizedFeatures.get(feature)
+          if (finalizedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
+          }
+        }
+        output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
+        output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
+
+        val epoch = {
+          if (result.finalizedFeaturesEpoch.isPresent) {
+            result.finalizedFeaturesEpoch.get.toString
+          } else {
+            "-"
+          }
+        }
+        output.append(s"\tEpoch: $epoch")
+
+        println(output)
+    }
+  }
+
+  /**
+   * Upgrades all features known to this tool to their highest max version levels. The method may
+   * add new finalized features if they were not finalized previously, but it does not delete
+   * any existing finalized feature. The results of the feature updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def upgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val updates = supportedFeatures.features.asScala.map {
+      case (feature, targetVersionRange) =>
+        val existingVersionRange = existingFinalizedFeatures.get(feature)
+        if (existingVersionRange == null) {
+          val updateStr =
+            addOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: -" +
+            s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+          (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+        } else {
+          if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              upgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Downgrades existing finalized features to the highest max version levels known to this tool.
+   * The method may delete existing finalized features if they are no longer seen to be supported,
+   * but it does not add a feature that was not finalized previously. The results of the feature
+   * updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def downgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val supportedFeaturesMap = supportedFeatures.features
+    val updates = existingFinalizedFeatures.asScala.map {
+      case (feature, existingVersionRange) =>
+        val targetVersionRange = supportedFeaturesMap.get(feature)
+        if (targetVersionRange == null) {
+          val updateStr =
+            deleteOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+            s"\tNewFinalizedMaxVersion: -"
+          (feature, (updateStr, new FeatureUpdate(0, true)))
+        } else {
+          if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              downgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, true)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Applies the provided feature updates. If the --dry-run CLI option is provided, the method
+   * only prints the expected feature updates to STDOUT without applying them.
+   *
+   * @param updates the feature updates to be applied via the admin client
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
+    if (opts.hasDryRunOption) {
+      println("Expected feature updates:")
+      println(ListMap(updates.toSeq.sortBy(_._1):_*)
+                .map { case(_, (updateStr, _)) => updateStr}
+                .mkString("\n"))
+    } else {
+      val result = adminClient.updateFeatures(
+        updates.map { case(feature, (_, update)) => (feature, update)}.asJava,
+        new UpdateFeaturesOptions())
+      val failures = ListMap(result.values.asScala.toSeq.sortBy(_._1):_*).map {

Review comment:
       Could we use case to avoid unnamed reference _._1?

##########
File path: core/src/main/scala/kafka/admin/FeatureCommand.scala
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+
+import joptsimple.OptionSpec
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+    val opts = new FeatureCommandOptions(args)
+    val featureApis = new FeatureApis(opts)
+    var exitCode = 0
+    try {
+      featureApis.execute()
+    } catch {
+      case e: IllegalArgumentException =>
+        printException(e)
+        opts.parser.printHelpOn(System.err)
+        exitCode = 1
+      case _: UpdateFeaturesException =>
+        exitCode = 1
+      case e: Throwable =>
+        printException(e)
+        exitCode = 1
+    } finally {
+      featureApis.close()
+      Exit.exit(exitCode)
+    }
+  }
+
+  private def printException(exception: Throwable): Unit = {
+    System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge the Admin client feature APIs with the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(var opts: FeatureCommandOptions) {
+  private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
+  private val adminClient = createAdminClient()
+
+  private def pad(op: String): String = {
+    f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // For testing only.
+  private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
+    opts = newOpts
+  }
+
+  private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
+    val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
+    adminClient.describeFeatures(options).featureMetadata().get()
+  }
+
+  /**
+   * Describes the supported and finalized features. If the --from-controller CLI option
+   * is provided, then the request is issued only to the controller, otherwise the request is issued
+   * to any of the provided bootstrap servers.
+   */
+  def describeFeatures(): Unit = {
+    val result = describeFeatures(opts.hasFromControllerOption)
+    val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
+
+    features.toList.sorted.foreach {
+      feature =>
+        val output = new StringBuilder()
+        output.append(s"Feature: $feature")
+
+        val (supportedMinVersion, supportedMaxVersion) = {
+          val supportedVersionRange = result.supportedFeatures.get(feature)
+          if (supportedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
+          }
+        }
+        output.append(s"\tSupportedMinVersion: $supportedMinVersion")
+        output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+
+        val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
+          val finalizedVersionRange = result.finalizedFeatures.get(feature)
+          if (finalizedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
+          }
+        }
+        output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
+        output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
+
+        val epoch = {
+          if (result.finalizedFeaturesEpoch.isPresent) {
+            result.finalizedFeaturesEpoch.get.toString
+          } else {
+            "-"
+          }
+        }
+        output.append(s"\tEpoch: $epoch")
+
+        println(output)
+    }
+  }
+
+  /**
+   * Upgrades all features known to this tool to their highest max version levels. The method may
+   * add new finalized features if they were not finalized previously, but it does not delete
+   * any existing finalized feature. The results of the feature updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def upgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val updates = supportedFeatures.features.asScala.map {
+      case (feature, targetVersionRange) =>
+        val existingVersionRange = existingFinalizedFeatures.get(feature)
+        if (existingVersionRange == null) {
+          val updateStr =
+            addOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: -" +
+            s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+          (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+        } else {
+          if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              upgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, false)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Downgrades existing finalized features to the highest max version levels known to this tool.
+   * The method may delete existing finalized features if they are no longer seen to be supported,
+   * but it does not add a feature that was not finalized previously. The results of the feature
+   * updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def downgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val supportedFeaturesMap = supportedFeatures.features
+    val updates = existingFinalizedFeatures.asScala.map {
+      case (feature, existingVersionRange) =>
+        val targetVersionRange = supportedFeaturesMap.get(feature)
+        if (targetVersionRange == null) {
+          val updateStr =
+            deleteOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+            s"\tNewFinalizedMaxVersion: -"
+          (feature, (updateStr, new FeatureUpdate(0, true)))
+        } else {
+          if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              downgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, (updateStr, new FeatureUpdate(targetVersionRange.max, true)))
+          } else {
+            (feature, null)
+          }
+        }
+    }.filter{ case(_, updateInfo) => updateInfo != null}.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Applies the provided feature updates. If the --dry-run CLI option is provided, the method
+   * only prints the expected feature updates to STDOUT without applying them.
+   *
+   * @param updates the feature updates to be applied via the admin client
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
+    if (opts.hasDryRunOption) {
+      println("Expected feature updates:")
+      println(ListMap(updates.toSeq.sortBy(_._1):_*)
+                .map { case(_, (updateStr, _)) => updateStr}
+                .mkString("\n"))
+    } else {
+      val result = adminClient.updateFeatures(
+        updates.map { case(feature, (_, update)) => (feature, update)}.asJava,
+        new UpdateFeaturesOptions())
+      val failures = ListMap(result.values.asScala.toSeq.sortBy(_._1):_*).map {
+        case (feature, updateFuture) =>
+          val (updateStr, _) = updates(feature)
+          try {
+            updateFuture.get
+            println(updateStr + "\tResult: OK")
+            0
+          } catch {
+            case e: Exception =>
+              println(updateStr + "\tResult: FAILED due to " + e.getMessage)
+              1
+          }
+      }.sum
+      if (failures > 0) {
+        throw new UpdateFeaturesException(s"$failures feature updates failed!")
+      }
+    }
+  }
+
+  def execute(): Unit = {
+    if (opts.hasDescribeOption) {
+      describeFeatures()
+    } else if (opts.hasUpgradeAllOption) {
+      upgradeAllFeatures()
+    } else if (opts.hasDowngradeAllOption) {
+      downgradeAllFeatures()
+    } else {
+      throw new IllegalStateException("Unexpected state: no CLI command could be executed.")
+    }
+  }
+
+  def close(): Unit = {
+    adminClient.close()
+  }
+
+  private def createAdminClient(): Admin = {
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServers)
+    Admin.create(props)

Review comment:
       The bootstrap port may be secured (e.g., SSL, SASL). So we need to be able to pass in the security configs from the command line. See commandConfigOpt in TopicCommand.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org