You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2020/10/20 16:07:25 UTC
[kafka] branch 2.7 updated: KAFKA-10599: Implement basic CLI tool
for feature versioning system (#9409) (#9455)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new eb14539 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455)
eb14539 is described below
commit eb1453990b147f2f8d6c9d1209990c365739f23b
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Tue Oct 20 09:02:08 2020 -0700
KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455)
This PR implements a basic CLI tool for feature versioning system. The KIP-584 write up has been updated to suit this PR. The following is implemented in this PR:
--describe:
Describe supported and finalized features.
Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --describe [--from-controller] [--command-config <path_to_java_properties_file>]
Optionally, use the --from-controller option to get features from the controller.
--upgrade-all:
Upgrades all features known to the tool to their highest max version levels.
Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --upgrade-all [--dry-run] [--command-config <path_to_java_properties_file>]
Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them.
--downgrade-all:
Downgrades existing finalized features to the highest max version levels known to this tool.
Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --downgrade-all [--dry-run] [--command-config <path_to_java_properties_file>].
Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them.
Reviewers: Boyang Chen <bo...@confluent.io>, Jun Rao <ju...@gmail.com>
---
bin/kafka-features.sh | 17 +
.../main/scala/kafka/admin/FeatureCommand.scala | 408 +++++++++++++++++++++
.../unit/kafka/admin/FeatureCommandTest.scala | 244 ++++++++++++
3 files changed, 669 insertions(+)
diff --git a/bin/kafka-features.sh b/bin/kafka-features.sh
new file mode 100755
index 0000000..9dd9f16
--- /dev/null
+++ b/bin/kafka-features.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@"
diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala
new file mode 100644
index 0000000..9cc0a10
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.server.BrokerFeatures
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+import java.util.Properties
+
+import scala.collection.Seq
+import scala.collection.immutable.ListMap
+import scala.jdk.CollectionConverters._
+import joptsimple.OptionSpec
+
+import scala.concurrent.ExecutionException
+
+object FeatureCommand {
+
+ def main(args: Array[String]): Unit = {
+ val opts = new FeatureCommandOptions(args)
+ val featureApis = new FeatureApis(opts)
+ var exitCode = 0
+ try {
+ featureApis.execute()
+ } catch {
+ case e: IllegalArgumentException =>
+ printException(e)
+ opts.parser.printHelpOn(System.err)
+ exitCode = 1
+ case _: UpdateFeaturesException =>
+ exitCode = 1
+ case e: ExecutionException =>
+ val cause = if (e.getCause == null) e else e.getCause
+ printException(cause)
+ exitCode = 1
+ case e: Throwable =>
+ printException(e)
+ exitCode = 1
+ } finally {
+ featureApis.close()
+ Exit.exit(exitCode)
+ }
+ }
+
+ private def printException(exception: Throwable): Unit = {
+ System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
+ }
+}
+
+class UpdateFeaturesException(message: String) extends RuntimeException(message)
+
+/**
+ * A class that provides necessary APIs to bridge feature APIs provided by the the Admin client with
+ * the requirements of the CLI tool.
+ *
+ * @param opts the CLI options
+ */
+class FeatureApis(private var opts: FeatureCommandOptions) {
+ private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
+ private var adminClient = FeatureApis.createAdminClient(opts)
+
+ private def pad(op: String): String = {
+ f"$op%11s"
+ }
+
+ private val addOp = pad("[Add]")
+ private val upgradeOp = pad("[Upgrade]")
+ private val deleteOp = pad("[Delete]")
+ private val downgradeOp = pad("[Downgrade]")
+
+ // For testing only.
+ private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+ supportedFeatures = newFeatures
+ }
+
+ // For testing only.
+ private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
+ adminClient.close()
+ adminClient = FeatureApis.createAdminClient(newOpts)
+ opts = newOpts
+ }
+
+ private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
+ val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
+ adminClient.describeFeatures(options).featureMetadata().get()
+ }
+
+ /**
+ * Describes the supported and finalized features. If the --from-controller CLI option
+ * is provided, then the request is issued only to the controller, otherwise the request is issued
+ * to any of the provided bootstrap servers.
+ */
+ def describeFeatures(): Unit = {
+ val result = describeFeatures(opts.hasFromControllerOption)
+ val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
+
+ features.toList.sorted.foreach {
+ feature =>
+ val output = new StringBuilder()
+ output.append(s"Feature: $feature")
+
+ val (supportedMinVersion, supportedMaxVersion) = {
+ val supportedVersionRange = result.supportedFeatures.get(feature)
+ if (supportedVersionRange == null) {
+ ("-", "-")
+ } else {
+ (supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
+ }
+ }
+ output.append(s"\tSupportedMinVersion: $supportedMinVersion")
+ output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
+
+ val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
+ val finalizedVersionRange = result.finalizedFeatures.get(feature)
+ if (finalizedVersionRange == null) {
+ ("-", "-")
+ } else {
+ (finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
+ }
+ }
+ output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
+ output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
+
+ val epoch = {
+ if (result.finalizedFeaturesEpoch.isPresent) {
+ result.finalizedFeaturesEpoch.get.toString
+ } else {
+ "-"
+ }
+ }
+ output.append(s"\tEpoch: $epoch")
+
+ println(output)
+ }
+ }
+
+ /**
+ * Upgrades all features known to this tool to their highest max version levels. The method may
+ * add new finalized features if they were not finalized previously, but it does not delete
+ * any existing finalized feature. The results of the feature updates are written to STDOUT.
+ *
+ * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+ * updates to STDOUT, without applying them.
+ *
+ * @throws UpdateFeaturesException if at least one of the feature updates failed
+ */
+ def upgradeAllFeatures(): Unit = {
+ val metadata = describeFeatures(true)
+ val existingFinalizedFeatures = metadata.finalizedFeatures
+ val updates = supportedFeatures.features.asScala.map {
+ case (feature, targetVersionRange) =>
+ val existingVersionRange = existingFinalizedFeatures.get(feature)
+ if (existingVersionRange == null) {
+ val updateStr =
+ addOp +
+ s"\tFeature: $feature" +
+ s"\tExistingFinalizedMaxVersion: -" +
+ s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+ (feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
+ } else {
+ if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
+ val updateStr =
+ upgradeOp +
+ s"\tFeature: $feature" +
+ s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+ s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+ (feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
+ } else {
+ (feature, Option.empty)
+ }
+ }
+ }.filter {
+ case(_, updateInfo) => updateInfo.isDefined
+ }.map {
+ case(feature, updateInfo) => (feature, updateInfo.get)
+ }.toMap
+
+ if (updates.nonEmpty) {
+ maybeApplyFeatureUpdates(updates)
+ }
+ }
+
+ /**
+ * Downgrades existing finalized features to the highest max version levels known to this tool.
+ * The method may delete existing finalized features if they are no longer seen to be supported,
+ * but it does not add a feature that was not finalized previously. The results of the feature
+ * updates are written to STDOUT.
+ *
+ * NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
+ * updates to STDOUT, without applying them.
+ *
+ * @throws UpdateFeaturesException if at least one of the feature updates failed
+ */
+ def downgradeAllFeatures(): Unit = {
+ val metadata = describeFeatures(true)
+ val existingFinalizedFeatures = metadata.finalizedFeatures
+ val supportedFeaturesMap = supportedFeatures.features
+ val updates = existingFinalizedFeatures.asScala.map {
+ case (feature, existingVersionRange) =>
+ val targetVersionRange = supportedFeaturesMap.get(feature)
+ if (targetVersionRange == null) {
+ val updateStr =
+ deleteOp +
+ s"\tFeature: $feature" +
+ s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+ s"\tNewFinalizedMaxVersion: -"
+ (feature, Some(updateStr, new FeatureUpdate(0, true)))
+ } else {
+ if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
+ val updateStr =
+ downgradeOp +
+ s"\tFeature: $feature" +
+ s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
+ s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
+ (feature, Some(updateStr, new FeatureUpdate(targetVersionRange.max, true)))
+ } else {
+ (feature, Option.empty)
+ }
+ }
+ }.filter {
+ case(_, updateInfo) => updateInfo.isDefined
+ }.map {
+ case(feature, updateInfo) => (feature, updateInfo.get)
+ }.toMap
+
+ if (updates.nonEmpty) {
+ maybeApplyFeatureUpdates(updates)
+ }
+ }
+
+ /**
+ * Applies the provided feature updates. If the --dry-run CLI option is provided, the method
+ * only prints the expected feature updates to STDOUT without applying them.
+ *
+ * @param updates the feature updates to be applied via the admin client
+ *
+ * @throws UpdateFeaturesException if at least one of the feature updates failed
+ */
+ private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
+ if (opts.hasDryRunOption) {
+ println("Expected feature updates:" + ListMap(
+ updates
+ .toSeq
+ .sortBy { case(feature, _) => feature} :_*)
+ .map { case(_, (updateStr, _)) => updateStr}
+ .mkString("\n"))
+ } else {
+ val result = adminClient.updateFeatures(
+ updates
+ .map { case(feature, (_, update)) => (feature, update)}
+ .asJava,
+ new UpdateFeaturesOptions())
+ val resultSortedByFeature = ListMap(
+ result
+ .values
+ .asScala
+ .toSeq
+ .sortBy { case(feature, _) => feature} :_*)
+ val failures = resultSortedByFeature.map {
+ case (feature, updateFuture) =>
+ val (updateStr, _) = updates(feature)
+ try {
+ updateFuture.get
+ println(updateStr + "\tResult: OK")
+ 0
+ } catch {
+ case e: ExecutionException =>
+ val cause = if (e.getCause == null) e else e.getCause
+ println(updateStr + "\tResult: FAILED due to " + cause)
+ 1
+ case e: Throwable =>
+ println(updateStr + "\tResult: FAILED due to " + e)
+ 1
+ }
+ }.sum
+ if (failures > 0) {
+ throw new UpdateFeaturesException(s"$failures feature updates failed!")
+ }
+ }
+ }
+
+ def execute(): Unit = {
+ if (opts.hasDescribeOption) {
+ describeFeatures()
+ } else if (opts.hasUpgradeAllOption) {
+ upgradeAllFeatures()
+ } else if (opts.hasDowngradeAllOption) {
+ downgradeAllFeatures()
+ } else {
+ throw new IllegalStateException("Unexpected state: no CLI command could be executed.")
+ }
+ }
+
+ def close(): Unit = {
+ adminClient.close()
+ }
+}
+
+class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
+ private val bootstrapServerOpt = parser.accepts(
+ "bootstrap-server",
+ "REQUIRED: A comma-separated list of host:port pairs to use for establishing the connection" +
+ " to the Kafka cluster.")
+ .withRequiredArg
+ .describedAs("server to connect to")
+ .ofType(classOf[String])
+ private val commandConfigOpt = parser.accepts(
+ "command-config",
+ "Property file containing configs to be passed to Admin Client." +
+ " This is used with --bootstrap-server option when required.")
+ .withOptionalArg
+ .describedAs("command config property file")
+ .ofType(classOf[String])
+ private val describeOpt = parser.accepts(
+ "describe",
+ "Describe supported and finalized features. By default, the features are described from a" +
+ " random broker. The request can be optionally directed only to the controller using the" +
+ " --from-controller option.")
+ private val fromControllerOpt = parser.accepts(
+ "from-controller",
+ "Describe supported and finalized features from the controller.")
+ private val upgradeAllOpt = parser.accepts(
+ "upgrade-all",
+ "Upgrades all finalized features to the maximum version levels known to the tool." +
+ " This command finalizes new features known to the tool that were never finalized" +
+ " previously in the cluster, but it is guaranteed to not delete any existing feature.")
+ private val downgradeAllOpt = parser.accepts(
+ "downgrade-all",
+ "Downgrades all finalized features to the maximum version levels known to the tool." +
+ " This command deletes unknown features from the list of finalized features in the" +
+ " cluster, but it is guaranteed to not add a new feature.")
+ private val dryRunOpt = parser.accepts(
+ "dry-run",
+ "Performs a dry-run of upgrade/downgrade mutations to finalized feature without applying them.")
+
+ options = parser.parse(args : _*)
+
+ checkArgs()
+
+ def has(builder: OptionSpec[_]): Boolean = options.has(builder)
+
+ def hasDescribeOption: Boolean = has(describeOpt)
+
+ def hasFromControllerOption: Boolean = has(fromControllerOpt)
+
+ def hasDryRunOption: Boolean = has(dryRunOpt)
+
+ def hasUpgradeAllOption: Boolean = has(upgradeAllOpt)
+
+ def hasDowngradeAllOption: Boolean = has(downgradeAllOpt)
+
+ def commandConfig: Properties = {
+ if (has(commandConfigOpt))
+ Utils.loadProps(options.valueOf(commandConfigOpt))
+ else
+ new Properties()
+ }
+
+ def bootstrapServers: String = options.valueOf(bootstrapServerOpt)
+
+ def checkArgs(): Unit = {
+ CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool describes and updates finalized features.")
+ val numActions = Seq(describeOpt, upgradeAllOpt, downgradeAllOpt).count(has)
+ if (numActions != 1) {
+ CommandLineUtils.printUsageAndDie(
+ parser,
+ "Command must include exactly one action: --describe, --upgrade-all, --downgrade-all.")
+ }
+ CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
+ if (hasDryRunOption && !hasUpgradeAllOption && !hasDowngradeAllOption) {
+ CommandLineUtils.printUsageAndDie(
+ parser,
+ "Command can contain --dry-run option only when either --upgrade-all or --downgrade-all actions are provided.")
+ }
+ if (hasFromControllerOption && !hasDescribeOption) {
+ CommandLineUtils.printUsageAndDie(
+ parser,
+ "Command can contain --from-controller option only when --describe action is provided.")
+ }
+ }
+}
+
+object FeatureApis {
+ private def createAdminClient(opts: FeatureCommandOptions): Admin = {
+ val props = new Properties()
+ props.putAll(opts.commandConfig)
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServers)
+ Admin.create(props)
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
new file mode 100644
index 0000000..0b9f80d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.server.{BaseRequestTest, KafkaConfig, KafkaServer}
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.utils.Utils
+
+import java.util.Properties
+
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.Test
+import org.scalatest.Assertions.intercept
+
+class FeatureCommandTest extends BaseRequestTest {
+ override def brokerCount: Int = 3
+
+ override def brokerPropertyOverrides(props: Properties): Unit = {
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString)
+ }
+
+ private val defaultSupportedFeatures: Features[SupportedVersionRange] =
+ Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
+ Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5))))
+
+ private def updateSupportedFeatures(features: Features[SupportedVersionRange],
+ targetServers: Set[KafkaServer]): Unit = {
+ targetServers.foreach(s => {
+ s.brokerFeatures.setSupportedFeatures(features)
+ s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+ })
+
+ // Wait until updates to all BrokerZNode supported features propagate to the controller.
+ val brokerIds = targetServers.map(s => s.config.brokerId)
+ waitUntilTrue(
+ () => servers.exists(s => {
+ if (s.kafkaController.isActive) {
+ s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+ .filter(b => brokerIds.contains(b.id))
+ .forall(b => {
+ b.features.equals(features)
+ })
+ } else {
+ false
+ }
+ }),
+ "Controller did not get broker updates")
+ }
+
+ private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = {
+ updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+ }
+
+ /**
+ * Tests if the FeatureApis#describeFeatures API works as expected when describing features before and
+ * after upgrading features.
+ */
+ @Test
+ def testDescribeFeaturesSuccess(): Unit = {
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
+ val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--describe", "--from-controller")))
+ featureApis.setSupportedFeatures(defaultSupportedFeatures)
+ try {
+ val initialDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
+ val expectedInitialDescribeOutput =
+ "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n" +
+ "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n"
+ assertEquals(expectedInitialDescribeOutput, initialDescribeOutput)
+ featureApis.upgradeAllFeatures()
+ val finalDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
+ val expectedFinalDescribeOutput =
+ "Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 3\tEpoch: 1\n" +
+ "Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 5\tEpoch: 1\n"
+ assertEquals(expectedFinalDescribeOutput, finalDescribeOutput)
+ } finally {
+ featureApis.close()
+ }
+ }
+
+ /**
+ * Tests if the FeatureApis#upgradeAllFeatures API works as expected during a success case.
+ */
+ @Test
+ def testUpgradeAllFeaturesSuccess(): Unit = {
+ val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
+ val featureApis = new FeatureApis(upgradeOpts)
+ try {
+ // Step (1):
+ // - Update the supported features across all brokers.
+ // - Upgrade non-existing feature_1 to maxVersionLevel: 2.
+ // - Verify results.
+ val initialSupportedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
+ updateSupportedFeaturesInAllBrokers(initialSupportedFeatures)
+ featureApis.setSupportedFeatures(initialSupportedFeatures)
+ var output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
+ var expected =
+ " [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 2\tResult: OK\n"
+ assertEquals(expected, output)
+
+ // Step (2):
+ // - Update the supported features across all brokers.
+ // - Upgrade existing feature_1 to maxVersionLevel: 3.
+ // - Upgrade non-existing feature_2 to maxVersionLevel: 5.
+ // - Verify results.
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
+ featureApis.setSupportedFeatures(defaultSupportedFeatures)
+ output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
+ expected =
+ " [Upgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: 3\tResult: OK\n" +
+ " [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 5\tResult: OK\n"
+ assertEquals(expected, output)
+
+ // Step (3):
+ // - Perform an upgrade of all features again.
+ // - Since supported features have not changed, expect that the above action does not yield
+ // any results.
+ output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
+ assertTrue(output.isEmpty)
+ featureApis.setOptions(upgradeOpts)
+ output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
+ assertTrue(output.isEmpty)
+ } finally {
+ featureApis.close()
+ }
+ }
+
+ /**
+ * Tests if the FeatureApis#downgradeAllFeatures API works as expected during a success case.
+ */
+ @Test
+ def testDowngradeFeaturesSuccess(): Unit = {
+ val downgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--downgrade-all"))
+ val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
+ val featureApis = new FeatureApis(upgradeOpts)
+ try {
+ // Step (1):
+ // - Update the supported features across all brokers.
+ // - Upgrade non-existing feature_1 to maxVersionLevel: 3.
+ // - Upgrade non-existing feature_2 to maxVersionLevel: 5.
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
+ featureApis.setSupportedFeatures(defaultSupportedFeatures)
+ featureApis.upgradeAllFeatures()
+
+ // Step (2):
+ // - Downgrade existing feature_1 to maxVersionLevel: 2.
+ // - Delete feature_2 since it is no longer supported by the FeatureApis object.
+ // - Verify results.
+ val downgradedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
+ featureApis.setSupportedFeatures(downgradedFeatures)
+ featureApis.setOptions(downgradeOpts)
+ var output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
+ var expected =
+ "[Downgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 3\tNewFinalizedMaxVersion: 2\tResult: OK\n" +
+ " [Delete]\tFeature: feature_2\tExistingFinalizedMaxVersion: 5\tNewFinalizedMaxVersion: -\tResult: OK\n"
+ assertEquals(expected, output)
+
+ // Step (3):
+ // - Perform a downgrade of all features again.
+ // - Since supported features have not changed, expect that the above action does not yield
+ // any results.
+ updateSupportedFeaturesInAllBrokers(downgradedFeatures)
+ output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
+ assertTrue(output.isEmpty)
+
+ // Step (4):
+ // - Delete feature_1 since it is no longer supported by the FeatureApis object.
+ // - Verify results.
+ featureApis.setSupportedFeatures(Features.emptySupportedFeatures())
+ output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
+ expected =
+ " [Delete]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: -\tResult: OK\n"
+ assertEquals(expected, output)
+ } finally {
+ featureApis.close()
+ }
+ }
+
+ /**
+ * Tests if the FeatureApis#upgradeAllFeatures API works as expected during a partial failure case.
+ */
+ @Test
+ def testUpgradeFeaturesFailure(): Unit = {
+ val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
+ val featureApis = new FeatureApis(upgradeOpts)
+ try {
+ // Step (1): Update the supported features across all brokers.
+ updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
+
+ // Step (2):
+ // - Intentionally setup the FeatureApis object such that it contains incompatible target
+ // features (viz. feature_2 and feature_3).
+ // - Upgrade non-existing feature_1 to maxVersionLevel: 4. Expect the operation to fail with
+ // an incompatibility failure.
+ // - Upgrade non-existing feature_2 to maxVersionLevel: 5. Expect the operation to succeed.
+ // - Upgrade non-existing feature_3 to maxVersionLevel: 3. Expect the operation to fail
+ // since the feature is not supported.
+ val targetFeaturesWithIncompatibilities =
+ Features.supportedFeatures(
+ Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 4)),
+ Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5)),
+ Utils.mkEntry("feature_3", new SupportedVersionRange(1, 3))))
+ featureApis.setSupportedFeatures(targetFeaturesWithIncompatibilities)
+ val output = TestUtils.grabConsoleOutput({
+ val exception = intercept[UpdateFeaturesException] {
+ featureApis.upgradeAllFeatures()
+ }
+ assertEquals("2 feature updates failed!", exception.getMessage)
+ })
+ val expected =
+ " [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -" +
+ "\tNewFinalizedMaxVersion: 4\tResult: FAILED due to" +
+ " org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
+ " feature update because brokers were found to have incompatible versions for the" +
+ " feature.\n" +
+ " [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -" +
+ "\tNewFinalizedMaxVersion: 5\tResult: OK\n" +
+ " [Add]\tFeature: feature_3\tExistingFinalizedMaxVersion: -" +
+ "\tNewFinalizedMaxVersion: 3\tResult: FAILED due to" +
+ " org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
+ " feature update because the provided feature is not supported.\n"
+ assertEquals(expected, output)
+ } finally {
+ featureApis.close()
+ }
+ }
+}