You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2020/10/20 16:07:25 UTC

[kafka] branch 2.7 updated: KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new eb14539  KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455)
eb14539 is described below

commit eb1453990b147f2f8d6c9d1209990c365739f23b
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Tue Oct 20 09:02:08 2020 -0700

    KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455)
    
    This PR implements a basic CLI tool for feature versioning system. The KIP-584 write up has been updated to suit this PR. The following is implemented in this PR:
    
    --describe:
    Describe supported and finalized features.
    Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --describe [--from-controller] [--command-config <path_to_java_properties_file>]
    Optionally, use the --from-controller option to get features from the controller.
    --upgrade-all:
    Upgrades all features known to the tool to their highest max version levels.
    Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --upgrade-all [--dry-run] [--command-config <path_to_java_properties_file>]
    Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them.
    --downgrade-all:
    Downgrades existing finalized features to the highest max version levels known to this tool.
    Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --downgrade-all [--dry-run] [--command-config <path_to_java_properties_file>].
    Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them.
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Jun Rao <ju...@gmail.com>
---
 bin/kafka-features.sh                              |  17 +
 .../main/scala/kafka/admin/FeatureCommand.scala    | 408 +++++++++++++++++++++
 .../unit/kafka/admin/FeatureCommandTest.scala      | 244 ++++++++++++
 3 files changed, 669 insertions(+)

diff --git a/bin/kafka-features.sh b/bin/kafka-features.sh
new file mode 100755
index 0000000..9dd9f16
--- /dev/null
+++ b/bin/kafka-features.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@"
diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala
new file mode 100644
index 0000000..9cc0a10
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+import java.util.Properties
+
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+import joptsimple.OptionSpec
+
+import scala.concurrent.ExecutionException
+
+object FeatureCommand {
+
+  def main(args: Array[String]): Unit = {
+    val opts = new FeatureCommandOptions(args)
+    val featureApis = new FeatureApis(opts)
+    var exitCode = 0
+    try {
+      featureApis.execute()
+    } catch {
+      case e: IllegalArgumentException =>
+        printException(e)
+        opts.parser.printHelpOn(System.err)
+        exitCode = 1
+      case _: UpdateFeaturesException =>
+        exitCode = 1
+      case e: ExecutionException =>
+        val cause = if (e.getCause == null) e else e.getCause
+        printException(cause)
+        exitCode = 1
+      case e: Throwable =>
+        printException(e)
+        exitCode = 1
+    } finally {
+      featureApis.close()
+      Exit.exit(exitCode)
+    }
+  }
+
+  private def printException(exception: Throwable): Unit = {
+    System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
+  }
+}
+
+class UpdateFeaturesException(message: String) extends RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge feature APIs provided by the the Admin client with
+ * the requirements of the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(private var opts: FeatureCommandOptions) {
+  private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
+  private var adminClient = FeatureApis.createAdminClient(opts)
+
+  private def pad(op: String): String = {
+    f"$op%11s"
+  }
+
+  private val addOp = pad("[Add]")
+  private val upgradeOp = pad("[Upgrade]")
+  private val deleteOp = pad("[Delete]")
+  private val downgradeOp = pad("[Downgrade]")
+
+  // For testing only.
+  private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // For testing only.
+  private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
+    adminClient.close()
+    adminClient = FeatureApis.createAdminClient(newOpts)
+    opts = newOpts
+  }
+
+  private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
+    val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
+    adminClient.describeFeatures(options).featureMetadata().get()
+  }
+
+  /**
+   * Describes the supported and finalized features. If the --from-controller CLI option
+   * is provided, then the request is issued only to the controller, otherwise the request is issued
+   * to any of the provided bootstrap servers.
+   */
+  def describeFeatures(): Unit = {
+    val result = describeFeatures(opts.hasFromControllerOption)
+    val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
+
+    features.toList.sorted.foreach {
+      feature =>
+        val output = new StringBuilder()
+        output.append(s"Feature: $feature")
+
+        val (supportedMinVersion, supportedMaxVersion) = {
+          val supportedVersionRange = result.supportedFeatures.get(feature)
+          if (supportedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
+          }
+        }
+        output.append(s"\tSupportedMinVersion: $supportedMinVersion")
+        output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+
+        val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
+          val finalizedVersionRange = result.finalizedFeatures.get(feature)
+          if (finalizedVersionRange == null) {
+            ("-", "-")
+          } else {
+            (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
+          }
+        }
+        output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
+        output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
+
+        val epoch = {
+          if (result.finalizedFeaturesEpoch.isPresent) {
+            result.finalizedFeaturesEpoch.get.toString
+          } else {
+            "-"
+          }
+        }
+        output.append(s"\tEpoch: $epoch")
+
+        println(output)
+    }
+  }
+
+  /**
+   * Upgrades all features known to this tool to their highest max version levels. The method may
+   * add new finalized features if they were not finalized previously, but it does not delete
+   * any existing finalized feature. The results of the feature updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def upgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val updates = supportedFeatures.features.asScala.map {
+      case (feature, targetVersionRange) =>
+        val existingVersionRange = existingFinalizedFeatures.get(feature)
+        if (existingVersionRange == null) {
+          val updateStr =
+            addOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: -" +
+            s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+          (feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
+        } else {
+          if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              upgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
+          } else {
+            (feature, Option.empty)
+          }
+        }
+    }.filter {
+      case(_, updateInfo) => updateInfo.isDefined
+    }.map {
+      case(feature, updateInfo) => (feature, updateInfo.get)
+    }.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Downgrades existing finalized features to the highest max version levels known to this tool.
+   * The method may delete existing finalized features if they are no longer seen to be supported,
+   * but it does not add a feature that was not finalized previously. The results of the feature
+   * updates are written to STDOUT.
+   *
+   * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+   * updates to STDOUT, without applying them.
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  def downgradeAllFeatures(): Unit = {
+    val metadata = describeFeatures(true)
+    val existingFinalizedFeatures = metadata.finalizedFeatures
+    val supportedFeaturesMap = supportedFeatures.features
+    val updates = existingFinalizedFeatures.asScala.map {
+      case (feature, existingVersionRange) =>
+        val targetVersionRange = supportedFeaturesMap.get(feature)
+        if (targetVersionRange == null) {
+          val updateStr =
+            deleteOp +
+            s"\tFeature: $feature" +
+            s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+            s"\tNewFinalizedMaxVersion: -"
+          (feature, Some(updateStr, new FeatureUpdate(0, true)))
+        } else {
+          if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
+            val updateStr =
+              downgradeOp +
+              s"\tFeature: $feature" +
+              s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+              s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+            (feature, Some(updateStr, new FeatureUpdate(targetVersionRange.max, true)))
+          } else {
+            (feature, Option.empty)
+          }
+        }
+    }.filter {
+      case(_, updateInfo) => updateInfo.isDefined
+    }.map {
+      case(feature, updateInfo) => (feature, updateInfo.get)
+    }.toMap
+
+    if (updates.nonEmpty) {
+      maybeApplyFeatureUpdates(updates)
+    }
+  }
+
+  /**
+   * Applies the provided feature updates. If the --dry-run CLI option is provided, the method
+   * only prints the expected feature updates to STDOUT without applying them.
+   *
+   * @param updates the feature updates to be applied via the admin client
+   *
+   * @throws UpdateFeaturesException if at least one of the feature updates failed
+   */
+  private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
+    if (opts.hasDryRunOption) {
+      println("Expected feature updates:" + ListMap(
+        updates
+          .toSeq
+          .sortBy { case(feature, _) => feature} :_*)
+          .map { case(_, (updateStr, _)) => updateStr}
+          .mkString("\n"))
+    } else {
+      val result = adminClient.updateFeatures(
+        updates
+          .map { case(feature, (_, update)) => (feature, update)}
+          .asJava,
+        new UpdateFeaturesOptions())
+      val resultSortedByFeature = ListMap(
+        result
+          .values
+          .asScala
+          .toSeq
+          .sortBy { case(feature, _) => feature} :_*)
+      val failures = resultSortedByFeature.map {
+        case (feature, updateFuture) =>
+          val (updateStr, _) = updates(feature)
+          try {
+            updateFuture.get
+            println(updateStr + "\tResult: OK")
+            0
+          } catch {
+            case e: ExecutionException =>
+              val cause = if (e.getCause == null) e else e.getCause
+              println(updateStr + "\tResult: FAILED due to " + cause)
+              1
+            case e: Throwable =>
+              println(updateStr + "\tResult: FAILED due to " + e)
+              1
+          }
+      }.sum
+      if (failures > 0) {
+        throw new UpdateFeaturesException(s"$failures feature updates failed!")
+      }
+    }
+  }
+
+  def execute(): Unit = {
+    if (opts.hasDescribeOption) {
+      describeFeatures()
+    } else if (opts.hasUpgradeAllOption) {
+      upgradeAllFeatures()
+    } else if (opts.hasDowngradeAllOption) {
+      downgradeAllFeatures()
+    } else {
+      throw new IllegalStateException("Unexpected state: no CLI command could be executed.")
+    }
+  }
+
+  def close(): Unit = {
+    adminClient.close()
+  }
+}
+
+class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+  private val bootstrapServerOpt = parser.accepts(
+      "bootstrap-server",
+      "REQUIRED: A comma-separated list of host:port pairs to use for establishing the connection" +
+      " to the Kafka cluster.")
+      .withRequiredArg
+      .describedAs("server to connect to")
+      .ofType(classOf[String])
+  private val commandConfigOpt = parser.accepts(
+    "command-config",
+    "Property file containing configs to be passed to Admin Client." +
+    " This is used with --bootstrap-server option when required.")
+    .withOptionalArg
+    .describedAs("command config property file")
+    .ofType(classOf[String])
+  private val describeOpt = parser.accepts(
+    "describe",
+    "Describe supported and finalized features. By default, the features are described from a" +
+    " random broker. The request can be optionally directed only to the controller using the" +
+    " --from-controller option.")
+  private val fromControllerOpt = parser.accepts(
+    "from-controller",
+    "Describe supported and finalized features from the controller.")
+  private val upgradeAllOpt = parser.accepts(
+    "upgrade-all",
+    "Upgrades all finalized features to the maximum version levels known to the tool." +
+    " This command finalizes new features known to the tool that were never finalized" +
+    " previously in the cluster, but it is guaranteed to not delete any existing feature.")
+  private val downgradeAllOpt = parser.accepts(
+    "downgrade-all",
+    "Downgrades all finalized features to the maximum version levels known to the tool." +
+    " This command deletes unknown features from the list of finalized features in the" +
+    " cluster, but it is guaranteed to not add a new feature.")
+  private val dryRunOpt = parser.accepts(
+    "dry-run",
+    "Performs a dry-run of upgrade/downgrade mutations to finalized feature without applying them.")
+
+  options = parser.parse(args : _*)
+
+  checkArgs()
+
+  def has(builder: OptionSpec[_]): Boolean = options.has(builder)
+
+  def hasDescribeOption: Boolean = has(describeOpt)
+
+  def hasFromControllerOption: Boolean = has(fromControllerOpt)
+
+  def hasDryRunOption: Boolean = has(dryRunOpt)
+
+  def hasUpgradeAllOption: Boolean = has(upgradeAllOpt)
+
+  def hasDowngradeAllOption: Boolean = has(downgradeAllOpt)
+
+  def commandConfig: Properties = {
+    if (has(commandConfigOpt))
+      Utils.loadProps(options.valueOf(commandConfigOpt))
+    else
+      new Properties()
+  }
+
+  def bootstrapServers: String = options.valueOf(bootstrapServerOpt)
+
+  def checkArgs(): Unit = {
+    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool describes and updates finalized features.")
+    val numActions = Seq(describeOpt, upgradeAllOpt, downgradeAllOpt).count(has)
+    if (numActions != 1) {
+      CommandLineUtils.printUsageAndDie(
+        parser,
+        "Command must include exactly one action: --describe, --upgrade-all, --downgrade-all.")
+    }
+    CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
+    if (hasDryRunOption && !hasUpgradeAllOption && !hasDowngradeAllOption) {
+      CommandLineUtils.printUsageAndDie(
+        parser,
+        "Command can contain --dry-run option only when either --upgrade-all or --downgrade-all actions are provided.")
+    }
+    if (hasFromControllerOption && !hasDescribeOption) {
+      CommandLineUtils.printUsageAndDie(
+        parser,
+        "Command can contain --from-controller option only when --describe action is provided.")
+    }
+  }
+}
+
+object FeatureApis {
+  private def createAdminClient(opts: FeatureCommandOptions): Admin = {
+    val props = new Properties()
+    props.putAll(opts.commandConfig)
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServers)
+    Admin.create(props)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
new file mode 100644
index 0000000..0b9f80d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.server.{BaseRequestTest, KafkaConfig, KafkaServer}
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Test
+import org.scalatest.Assertions.intercept
+
+class FeatureCommandTest extends BaseRequestTest {
+  override def brokerCount: Int = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString)
+  }
+
+  private val defaultSupportedFeatures: Features[SupportedVersionRange] =
+    Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+                                           Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5))))
+
+  private def updateSupportedFeatures(features: Features[SupportedVersionRange],
+                                      targetServers: Set[KafkaServer]): Unit = {
+    targetServers.foreach(s => {
+      s.brokerFeatures.setSupportedFeatures(features)
+      s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+    })
+
+    // Wait until updates to all BrokerZNode supported features propagate to the controller.
+    val brokerIds = targetServers.map(s => s.config.brokerId)
+    waitUntilTrue(
+      () => servers.exists(s => {
+        if (s.kafkaController.isActive) {
+          s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+            .filter(b => brokerIds.contains(b.id))
+            .forall(b => {
+              b.features.equals(features)
+            })
+        } else {
+          false
+        }
+      }),
+      "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = {
+    updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  /**
+   * Tests if the FeatureApis#describeFeatures API works as expected when describing features before and
+   * after upgrading features.
+   */
+  @Test
+  def testDescribeFeaturesSuccess(): Unit = {
+    updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
+    val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--describe", "--from-controller")))
+    featureApis.setSupportedFeatures(defaultSupportedFeatures)
+    try {
+      val initialDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
+      val expectedInitialDescribeOutput =
+        "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n" +
+        "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n"
+      assertEquals(expectedInitialDescribeOutput, initialDescribeOutput)
+      featureApis.upgradeAllFeatures()
+      val finalDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
+      val expectedFinalDescribeOutput =
+        "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 3\tEpoch: 1\n" +
+        "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 5\tEpoch: 1\n"
+      assertEquals(expectedFinalDescribeOutput, finalDescribeOutput)
+    } finally {
+      featureApis.close()
+    }
+  }
+
+  /**
+   * Tests if the FeatureApis#upgradeAllFeatures API works as expected during a success case.
+   */
+  @Test
+  def testUpgradeAllFeaturesSuccess(): Unit = {
+    val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
+    val featureApis = new FeatureApis(upgradeOpts)
+    try {
+      // Step (1):
+      // - Update the supported features across all brokers.
+      // - Upgrade non-existing feature_1 to maxVersionLevel: 2.
+      // - Verify results.
+      val initialSupportedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
+      updateSupportedFeaturesInAllBrokers(initialSupportedFeatures)
+      featureApis.setSupportedFeatures(initialSupportedFeatures)
+      var output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
+      var expected =
+        "      [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 2\tResult: OK\n"
+      assertEquals(expected, output)
+
+      // Step (2):
+      // - Update the supported features across all brokers.
+      // - Upgrade existing feature_1 to maxVersionLevel: 3.
+      // - Upgrade non-existing feature_2 to maxVersionLevel: 5.
+      // - Verify results.
+      updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
+      featureApis.setSupportedFeatures(defaultSupportedFeatures)
+      output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
+      expected =
+        "  [Upgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: 3\tResult: OK\n" +
+        "      [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 5\tResult: OK\n"
+      assertEquals(expected, output)
+
+      // Step (3):
+      // - Perform an upgrade of all features again.
+      // - Since supported features have not changed, expect that the above action does not yield
+      //   any results.
+      output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
+      assertTrue(output.isEmpty)
+      featureApis.setOptions(upgradeOpts)
+      output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
+      assertTrue(output.isEmpty)
+    } finally {
+      featureApis.close()
+    }
+  }
+
+  /**
+   * Tests if the FeatureApis#downgradeAllFeatures API works as expected during a success case.
+   */
+  @Test
+  def testDowngradeFeaturesSuccess(): Unit = {
+    val downgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--downgrade-all"))
+    val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
+    val featureApis = new FeatureApis(upgradeOpts)
+    try {
+      // Step (1):
+      // - Update the supported features across all brokers.
+      // - Upgrade non-existing feature_1 to maxVersionLevel: 3.
+      // - Upgrade non-existing feature_2 to maxVersionLevel: 5.
+      updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
+      featureApis.setSupportedFeatures(defaultSupportedFeatures)
+      featureApis.upgradeAllFeatures()
+
+      // Step (2):
+      // - Downgrade existing feature_1 to maxVersionLevel: 2.
+      // - Delete feature_2 since it is no longer supported by the FeatureApis object.
+      // - Verify results.
+      val downgradedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
+      featureApis.setSupportedFeatures(downgradedFeatures)
+      featureApis.setOptions(downgradeOpts)
+      var output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
+      var expected =
+        "[Downgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 3\tNewFinalizedMaxVersion: 2\tResult: OK\n" +
+        "   [Delete]\tFeature: feature_2\tExistingFinalizedMaxVersion: 5\tNewFinalizedMaxVersion: -\tResult: OK\n"
+      assertEquals(expected, output)
+
+      // Step (3):
+      // - Perform a downgrade of all features again.
+      // - Since supported features have not changed, expect that the above action does not yield
+      //   any results.
+      updateSupportedFeaturesInAllBrokers(downgradedFeatures)
+      output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
+      assertTrue(output.isEmpty)
+
+      // Step (4):
+      // - Delete feature_1 since it is no longer supported by the FeatureApis object.
+      // - Verify results.
+      featureApis.setSupportedFeatures(Features.emptySupportedFeatures())
+      output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
+      expected =
+        "   [Delete]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: -\tResult: OK\n"
+      assertEquals(expected, output)
+    } finally {
+      featureApis.close()
+    }
+  }
+
+  /**
+   * Tests if the FeatureApis#upgradeAllFeatures API works as expected during a partial failure case.
+   */
+  @Test
+  def testUpgradeFeaturesFailure(): Unit = {
+    val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
+    val featureApis = new FeatureApis(upgradeOpts)
+    try {
+      // Step (1): Update the supported features across all brokers.
+      updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
+
+      // Step (2):
+      // - Intentionally setup the FeatureApis object such that it contains incompatible target
+      //   features (viz. feature_2 and feature_3).
+      // - Upgrade non-existing feature_1 to maxVersionLevel: 4. Expect the operation to fail with
+      //   an incompatibility failure.
+      // - Upgrade non-existing feature_2 to maxVersionLevel: 5. Expect the operation to succeed.
+      // - Upgrade non-existing feature_3 to maxVersionLevel: 3. Expect the operation to fail
+      //   since the feature is not supported.
+      val targetFeaturesWithIncompatibilities =
+        Features.supportedFeatures(
+          Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 4)),
+                      Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5)),
+                      Utils.mkEntry("feature_3", new SupportedVersionRange(1, 3))))
+      featureApis.setSupportedFeatures(targetFeaturesWithIncompatibilities)
+      val output = TestUtils.grabConsoleOutput({
+        val exception = intercept[UpdateFeaturesException] {
+          featureApis.upgradeAllFeatures()
+        }
+        assertEquals("2 feature updates failed!", exception.getMessage)
+      })
+      val expected =
+        "      [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -" +
+        "\tNewFinalizedMaxVersion: 4\tResult: FAILED due to" +
+        " org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
+        " feature update because brokers were found to have incompatible versions for the" +
+        " feature.\n" +
+        "      [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -" +
+        "\tNewFinalizedMaxVersion: 5\tResult: OK\n" +
+        "      [Add]\tFeature: feature_3\tExistingFinalizedMaxVersion: -" +
+        "\tNewFinalizedMaxVersion: 3\tResult: FAILED due to" +
+        " org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
+        " feature update because the provided feature is not supported.\n"
+      assertEquals(expected, output)
+    } finally {
+      featureApis.close()
+    }
+  }
+}