You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/20 05:58:38 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r453842949



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any
+     * broker, but it can be optionally directed only to the controller via DescribeFeaturesOptions
+     * parameter.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the DescribeFeaturesResult containing the result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. The API is atomic, meaning that if a single
+     * feature update in the request can't succeed on the controller, then none of the feature
+     * updates are carried out. This request is issued only to the controller since the API is
+     * only served by the controller.
+     * <p>
+     * The API takes as input a set of FinalizedFeatureUpdate that need to be applied. Each such
+     * update specifies the finalized feature to be added or updated or deleted, along with the new
+     * max feature version level value.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
+     * in the controller if the feature update has the allowDowngrade flag set - setting this flag
+     * conveys user intent to attempt downgrade of a feature max version level. Note that despite
+     * the allowDowngrade flag being set, certain downgrades may be rejected by the controller if it
+     * is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular operation/intent. It is allowed
+     * only if the allowDowngrade flag is set in the feature update, and, if the max version level
+     * is set to a value less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures
+     * obtained from the returned {@link UpdateFinalizedFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a non-existing finalized feature is attempted
+     *   to be deleted or downgraded.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the updates could finish. It cannot be guaranteed whether
+     *   the updates succeeded or not.</li>
+     *   <li>{@link org.apache.kafka.common.errors.FinalizedFeatureUpdateFailedException}
+     *   If the updates could not be applied on the controller, despite the request being valid.
+     *   This may be a temporary problem.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+
+     * @param featureUpdates   the set of finalized feature updates
+     * @param options          the options to use
+     *
+     * @return                 the UpdateFinalizedFeaturesResult containing the result

Review comment:
       nit: get a ` {@link UpdateFinalizedFeaturesResult}` as well

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any
+     * broker, but it can be optionally directed only to the controller via DescribeFeaturesOptions
+     * parameter.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the DescribeFeaturesResult containing the result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. The API is atomic, meaning that if a single
+     * feature update in the request can't succeed on the controller, then none of the feature
+     * updates are carried out. This request is issued only to the controller since the API is
+     * only served by the controller.
+     * <p>
+     * The API takes as input a set of FinalizedFeatureUpdate that need to be applied. Each such

Review comment:
       s/`as input a set of FinalizedFeatureUpdate`/`in a set of feature updates`

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FinalizedFeatureUpdate.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Set;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesRequestData;
+
+/**
+ * Encapsulates details about an update to a finalized feature. This is particularly useful to
+ * define each feature update in the
+ * {@link Admin#updateFinalizedFeatures(Set, UpdateFinalizedFeaturesOptions)} API request.
+ */
+public class FinalizedFeatureUpdate {
+    private final String featureName;
+    private final short maxVersionLevel;
+    private final boolean allowDowngrade;
+
+    /**
+     * @param featureName       the name of the finalized feature to be updated.
+     * @param maxVersionLevel   the new maximum version level for the finalized feature.
+     *                          a value < 1 is special and indicates that the update is intended to
+     *                          delete the finalized feature, and should be accompanied by setting
+     *                          the allowDowngrade flag to true.
+     * @param allowDowngrade    - true, if this feature update was meant to downgrade the existing
+     *                            maximum version level of the finalized feature.
+     *                          - false, otherwise.
+     */
+    public FinalizedFeatureUpdate(
+        final String featureName, final short maxVersionLevel, final boolean allowDowngrade) {
+        Objects.requireNonNull(featureName, "Provided feature name can not be null.");
+        if (maxVersionLevel < 1 && !allowDowngrade) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "For featureName: %s, the allowDowngrade flag is not set when the" +
+                    " provided maxVersionLevel:%d is < 1.", featureName, maxVersionLevel));
+        }
+        this.featureName = featureName;
+        this.maxVersionLevel = maxVersionLevel;
+        this.allowDowngrade = allowDowngrade;
+    }
+
+    /**
+     * @return   the name of the finalized feature to be updated.
+     */
+    public String featureName() {
+        return featureName;
+    }
+
+    /**
+     * @return   the new maximum version level for the finalized feature.
+     */
+    public short maxVersionLevel() {
+        return maxVersionLevel;
+    }
+
+    /**
+     * @return   - true, if this feature update was meant to downgrade the maximum version level of
+     *             the finalized feature.
+     *           - false, otherwise.

Review comment:
       `false otherwise` doesn't provide too much useful info.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FinalizedFeatureUpdate.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Set;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesRequestData;
+
+/**
+ * Encapsulates details about an update to a finalized feature. This is particularly useful to
+ * define each feature update in the
+ * {@link Admin#updateFinalizedFeatures(Set, UpdateFinalizedFeaturesOptions)} API request.
+ */
+public class FinalizedFeatureUpdate {
+    private final String featureName;
+    private final short maxVersionLevel;
+    private final boolean allowDowngrade;
+
+    /**
+     * @param featureName       the name of the finalized feature to be updated.
+     * @param maxVersionLevel   the new maximum version level for the finalized feature.
+     *                          a value < 1 is special and indicates that the update is intended to
+     *                          delete the finalized feature, and should be accompanied by setting
+     *                          the allowDowngrade flag to true.
+     * @param allowDowngrade    - true, if this feature update was meant to downgrade the existing
+     *                            maximum version level of the finalized feature.
+     *                          - false, otherwise.
+     */
+    public FinalizedFeatureUpdate(
+        final String featureName, final short maxVersionLevel, final boolean allowDowngrade) {
+        Objects.requireNonNull(featureName, "Provided feature name can not be null.");
+        if (maxVersionLevel < 1 && !allowDowngrade) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "For featureName: %s, the allowDowngrade flag is not set when the" +
+                    " provided maxVersionLevel:%d is < 1.", featureName, maxVersionLevel));
+        }
+        this.featureName = featureName;
+        this.maxVersionLevel = maxVersionLevel;
+        this.allowDowngrade = allowDowngrade;
+    }
+
+    /**
+     * @return   the name of the finalized feature to be updated.

Review comment:
       nit: seems not necessary

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map enables feature
+ *    version level deprecation. This is how it works: in order to deprecate feature version levels,
+ *    in this map the default minimum version level of a feature can be set to a new value that's
+ *    higher than 1 (let's call this new_min_version_level). In doing so, the feature version levels
+ *    in the closed range: [1, latest_min_version_level - 1] get deprecated by the controller logic
+ *    that applies this map to persistent finalized feature state in ZK (this mutation happens
+ *    during controller election and during finalized feature updates via the
+ *    APIKeys.UPDATE_FINALIZED_FEATURES api).
+ *
+ * This class also provides APIs to check for incompatibilities between the features supported by
+ * the Broker and finalized features.
+ *
+ * NOTE: the update*() and clear*() APIs of this class should be used only for testing purposes.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange],
+                              @volatile var defaultFeatureMinVersionLevels: Map[String, Short]) {
+  require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+    supportedFeatures, defaultFeatureMinVersionLevels))
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, defaultFeatureMinVersionLevels))
+    supportedFeatures = newFeatures
+  }
+
+  /**
+   * Returns the default minimum version level for a specific feature.
+   *
+   * @param feature   the name of the feature
+   *
+   * @return          the default minimum version level for the feature if its defined.
+   *                  otherwise, returns 1.
+   */
+  def defaultMinVersionLevel(feature: String): Short = {
+    defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
+  // For testing only.
+  def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, newMinVersionLevels))
+    defaultFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  /**
+   * Returns the default finalized features that a new Kafka cluster with IBP config >= KAFKA_2_7_IV0
+   * needs to be bootstrapped with.
+   */
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+    Features.finalizedFeatures(
+      supportedFeatures.features.asScala.map {
+        case(name, versionRange) => (
+          name, new FinalizedVersionRange(defaultMinVersionLevel(name), versionRange.max))
+      }.asJava)
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the
+   *     supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities need to be checked for.
+   *
+   * @return            The subset of input features which are incompatible. If the returned object
+   *                    is empty, it means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = {
+    BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, true)

Review comment:
       nit: mark the parameter as `logIncompatibilities = true)`

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FinalizedFeatureUpdate.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Set;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesRequestData;
+
+/**
+ * Encapsulates details about an update to a finalized feature. This is particularly useful to
+ * define each feature update in the
+ * {@link Admin#updateFinalizedFeatures(Set, UpdateFinalizedFeaturesOptions)} API request.
+ */
+public class FinalizedFeatureUpdate {
+    private final String featureName;
+    private final short maxVersionLevel;
+    private final boolean allowDowngrade;
+
+    /**
+     * @param featureName       the name of the finalized feature to be updated.
+     * @param maxVersionLevel   the new maximum version level for the finalized feature.
+     *                          a value < 1 is special and indicates that the update is intended to
+     *                          delete the finalized feature, and should be accompanied by setting
+     *                          the allowDowngrade flag to true.
+     * @param allowDowngrade    - true, if this feature update was meant to downgrade the existing
+     *                            maximum version level of the finalized feature.
+     *                          - false, otherwise.
+     */
+    public FinalizedFeatureUpdate(
+        final String featureName, final short maxVersionLevel, final boolean allowDowngrade) {
+        Objects.requireNonNull(featureName, "Provided feature name can not be null.");
+        if (maxVersionLevel < 1 && !allowDowngrade) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "For featureName: %s, the allowDowngrade flag is not set when the" +
+                    " provided maxVersionLevel:%d is < 1.", featureName, maxVersionLevel));
+        }
+        this.featureName = featureName;
+        this.maxVersionLevel = maxVersionLevel;
+        this.allowDowngrade = allowDowngrade;
+    }
+
+    /**
+     * @return   the name of the finalized feature to be updated.
+     */
+    public String featureName() {
+        return featureName;
+    }
+
+    /**
+     * @return   the new maximum version level for the finalized feature.
+     */
+    public short maxVersionLevel() {
+        return maxVersionLevel;
+    }
+
+    /**
+     * @return   - true, if this feature update was meant to downgrade the maximum version level of
+     *             the finalized feature.
+     *           - false, otherwise.
+     */
+    public boolean allowDowngrade() {
+        return allowDowngrade;
+    }
+
+    /**
+     * Helper function that creates {@link UpdateFinalizedFeaturesRequestData} from a set of
+     * {@link FinalizedFeatureUpdate}.
+     *
+     * @param updates   the set of {@link FinalizedFeatureUpdate}
+     *
+     * @return          a newly constructed UpdateFinalizedFeaturesRequestData object
+     */
+    public static UpdateFinalizedFeaturesRequestData createRequest(Set<FinalizedFeatureUpdate> updates) {
+        final UpdateFinalizedFeaturesRequestData.FinalizedFeatureUpdateKeyCollection items
+            = new UpdateFinalizedFeaturesRequestData.FinalizedFeatureUpdateKeyCollection();
+        for (FinalizedFeatureUpdate update : updates) {
+            final UpdateFinalizedFeaturesRequestData.FinalizedFeatureUpdateKey item =
+                new UpdateFinalizedFeaturesRequestData.FinalizedFeatureUpdateKey();
+            item.setName(update.featureName());
+            item.setMaxVersionLevel(update.maxVersionLevel());
+            item.setAllowDowngrade(update.allowDowngrade());
+            items.add(item);
+        }
+        final UpdateFinalizedFeaturesRequestData data = new UpdateFinalizedFeaturesRequestData();
+        data.setFinalizedFeatureUpdates(items);

Review comment:
       nit: we could just `return new UpdateFinalizedFeaturesRequestData().setFinalizedFeatureUpdates(items)`

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map enables feature
+ *    version level deprecation. This is how it works: in order to deprecate feature version levels,
+ *    in this map the default minimum version level of a feature can be set to a new value that's
+ *    higher than 1 (let's call this new_min_version_level). In doing so, the feature version levels
+ *    in the closed range: [1, latest_min_version_level - 1] get deprecated by the controller logic
+ *    that applies this map to persistent finalized feature state in ZK (this mutation happens
+ *    during controller election and during finalized feature updates via the
+ *    APIKeys.UPDATE_FINALIZED_FEATURES api).

Review comment:
       `ApiKeys.UPDATE_FINALIZED_FEATURES API`

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map enables feature
+ *    version level deprecation. This is how it works: in order to deprecate feature version levels,
+ *    in this map the default minimum version level of a feature can be set to a new value that's
+ *    higher than 1 (let's call this new_min_version_level). In doing so, the feature version levels
+ *    in the closed range: [1, latest_min_version_level - 1] get deprecated by the controller logic
+ *    that applies this map to persistent finalized feature state in ZK (this mutation happens
+ *    during controller election and during finalized feature updates via the
+ *    APIKeys.UPDATE_FINALIZED_FEATURES api).
+ *
+ * This class also provides APIs to check for incompatibilities between the features supported by
+ * the Broker and finalized features.
+ *
+ * NOTE: the update*() and clear*() APIs of this class should be used only for testing purposes.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange],

Review comment:
       Maybe a newbie question here: since the `supportedFeatures` could be mutated, why couldn't we just assume its min level marks the `defaultFeatureMinVersionLevels`? Trying to understand the necessity for secondary bookkeeping. Might be good to also put reasonings in the meta comment as well to clear confusion.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is particularly useful
+ * to hold the result returned by the {@link Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Features<FinalizedVersionRange> finalizedFeatures;
+
+    private final int finalizedFeaturesEpoch;
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(
+        final Features<FinalizedVersionRange> finalizedFeatures,
+        final int finalizedFeaturesEpoch,
+        final Features<SupportedVersionRange> supportedFeatures
+    ) {
+        Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures can not be null.");
+        Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures can not be null.");
+        this.finalizedFeatures = finalizedFeatures;
+        this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+        this.supportedFeatures = supportedFeatures;
+    }
+
+    /**
+     * A map of finalized feature versions, with key being finalized feature name and value
+     * containing the min/max version levels for the finalized feature.
+     */
+    public Features<FinalizedVersionRange> finalizedFeatures() {
+        return finalizedFeatures;
+    }
+
+    /**
+     * The epoch for the finalized features.
+     * Valid values are >= 0. A value < 0 means the finalized features are absent/unavailable.

Review comment:
       We should consider using Optional for `finalizedFeaturesEpoch` to indicate absence.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig,
    */
   private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = {
     try {
+      val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers
+      if (config.isFeatureVersioningEnabled) {
+        def hasIncompatibleFeatures(broker: Broker): Boolean = {
+          val latestFinalizedFeatures = featureCache.get
+          if (latestFinalizedFeatures.isDefined) {
+            BrokerFeatures.hasIncompatibleFeatures(broker.features, latestFinalizedFeatures.get.features)
+          } else {
+            false
+          }
+        }
+        controllerContext.liveOrShuttingDownBrokers.foreach(broker => {
+          if (filteredBrokers.contains(broker.id) && hasIncompatibleFeatures(broker)) {

Review comment:
       Could the receiving broker analyze the request and decide to shut down itself? What's the gain we have by avoiding sending update metadata to incompatible brokers?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,158 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * Enables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with enabled status. This status means the feature versioning system
+   * (KIP-584) is enabled, and, the finalized features stored in the FeatureZNode are active. This
+   * status should be written by the controller to the FeatureZNode only when the broker IBP config
+   * is greater than or equal to KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    For a new Kafka cluster (i.e. it is deployed first time), we would like to start the cluster
+   *    with all the possible supported features finalized immediately. The new cluster will almost
+   *    never be started with an old IBP config that’s less than KAFKA_2_7_IV0. Assuming this is the
+   *    case, then here is how we it: the controller will start up and notice that the FeatureZNode
+   *    is absent in the new cluster, it will then create a FeatureZNode (with enabled status)
+   *    containing the entire list of default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the
+   *    Broker binary has been upgraded to a newer version that supports the feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier version of the Broker
+   *    binary. In this case, we want to start with no finalized features and allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the
+   *    features. The reason to do this is that enabling all the possible features immediately after
+   *    an upgrade could be harmful to the cluster.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. If absent, then it
+   *        will react by creating a FeatureZNode with disabled status and empty finalized features.
+   *        Otherwise, if a node already exists in enabled status then the controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already present with enabled
+   *    status and existing finalized features. In such a case, the controller needs to scan the
+   *    existing finalized features and mutate them for the purpose of version level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is present in the default
+   *    finalized features, then, it's existing minimum version level is updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range of feature version
+   *    levels deprecated are from the closed range: [existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, which requires a major
+   *    release of Kafka. In such a release, the minimum version level maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map {
+          case (featureName, existingVersionRange) => {
+            val updatedVersionRange = defaultFinalizedFeatures.get(featureName)
+            if (updatedVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with $existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (existingVersionRange.max() >= updatedVersionRange.min()) {
+              // Through this change, we deprecate all version levels in the closed range:
+              // [existingVersionRange.min(), updatedVersionRange.min() - 1]
+              (featureName, new FinalizedVersionRange(updatedVersionRange.min(), existingVersionRange.max()))
+            } else {
+              // This is a special case: If the existing version levels fall completely outside the
+              // range of the default finalized version levels (i.e. no intersection), then, this
+              // case is not eligible for deprecation. This requires that the max version level be
+              // upgraded first to a value that's equal to the the default minimum version level.
+              info(s"Can not update minimum version level in finalized feature: $featureName,"
+              + s" since the existing $existingVersionRange does not intersect with the default"
+              + s" $updatedVersionRange.")
+              (featureName, existingVersionRange)
+            }
+          }
+        }.asJava)
+      }
+      val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, newFeatures)
+      if (!newFeatureZNode.equals(existingFeatureZNode)) {
+        val newVersion = updateFeatureZNode(newFeatureZNode)
+        featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
+      }
+    }
+  }
+
+  /**
+   * Disables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with disabled status. This status means the feature versioning system
+   * (KIP-584) is disabled, and, the finalized features stored in the FeatureZNode are not relevant.
+   * This status should be written by the controller to the FeatureZNode only when the broker
+   * IBP config is less than KAFKA_2_7_IV0.
+   *
+   * NOTE:
+   * 1. When this method returns, existing finalized features (if any) will be cleared from the
+   *    FeatureZNode.
+   * 2. This method, unlike enableFeatureVersioning() need not wait for the FinalizedFeatureCache
+   *    to be updated, because, such updates to the caceh (via FinalizedFeatureChangeListener)
+   *    are disabled when IBP config is < than KAFKA_2_7_IV0.
+   */
+  private def disableFeatureVersioning(): Unit = {

Review comment:
       Is it ok for us to always do `updateFeatureZNode`, since this call is idempotent?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any
+     * broker, but it can be optionally directed only to the controller via DescribeFeaturesOptions
+     * parameter.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the DescribeFeaturesResult containing the result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. The API is atomic, meaning that if a single
+     * feature update in the request can't succeed on the controller, then none of the feature
+     * updates are carried out. This request is issued only to the controller since the API is
+     * only served by the controller.
+     * <p>
+     * The API takes as input a set of FinalizedFeatureUpdate that need to be applied. Each such
+     * update specifies the finalized feature to be added or updated or deleted, along with the new
+     * max feature version level value.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
+     * in the controller if the feature update has the allowDowngrade flag set - setting this flag
+     * conveys user intent to attempt downgrade of a feature max version level. Note that despite
+     * the allowDowngrade flag being set, certain downgrades may be rejected by the controller if it
+     * is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular operation/intent. It is allowed
+     * only if the allowDowngrade flag is set in the feature update, and, if the max version level
+     * is set to a value less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures
+     * obtained from the returned {@link UpdateFinalizedFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a non-existing finalized feature is attempted

Review comment:
       Looking at `UpdateFinalizedFeaturesResult`, we don't have a per feature based error code returned. If this is the case, how could we know which feature is missing?

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map enables feature
+ *    version level deprecation. This is how it works: in order to deprecate feature version levels,
+ *    in this map the default minimum version level of a feature can be set to a new value that's
+ *    higher than 1 (let's call this new_min_version_level). In doing so, the feature version levels
+ *    in the closed range: [1, latest_min_version_level - 1] get deprecated by the controller logic
+ *    that applies this map to persistent finalized feature state in ZK (this mutation happens
+ *    during controller election and during finalized feature updates via the
+ *    APIKeys.UPDATE_FINALIZED_FEATURES api).
+ *
+ * This class also provides APIs to check for incompatibilities between the features supported by
+ * the Broker and finalized features.
+ *
+ * NOTE: the update*() and clear*() APIs of this class should be used only for testing purposes.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange],
+                              @volatile var defaultFeatureMinVersionLevels: Map[String, Short]) {
+  require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+    supportedFeatures, defaultFeatureMinVersionLevels))
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, defaultFeatureMinVersionLevels))
+    supportedFeatures = newFeatures
+  }
+
+  /**
+   * Returns the default minimum version level for a specific feature.
+   *
+   * @param feature   the name of the feature
+   *
+   * @return          the default minimum version level for the feature if its defined.
+   *                  otherwise, returns 1.
+   */
+  def defaultMinVersionLevel(feature: String): Short = {
+    defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
+  // For testing only.
+  def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, newMinVersionLevels))
+    defaultFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  /**
+   * Returns the default finalized features that a new Kafka cluster with IBP config >= KAFKA_2_7_IV0
+   * needs to be bootstrapped with.
+   */
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+    Features.finalizedFeatures(
+      supportedFeatures.features.asScala.map {
+        case(name, versionRange) => (
+          name, new FinalizedVersionRange(defaultMinVersionLevel(name), versionRange.max))
+      }.asJava)
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the
+   *     supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities need to be checked for.
+   *
+   * @return            The subset of input features which are incompatible. If the returned object
+   *                    is empty, it means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = {
+    BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, true)
+  }
+}
+
+object BrokerFeatures extends Logging {
+
+  def createDefault(): BrokerFeatures = {
+    // The arguments are currently empty, but, in the future as we define features we should
+    // populate the required values here.
+    new BrokerFeatures(emptySupportedFeatures, Map[String, Short]())
+  }
+
+  /**
+   * Returns true if any of the provided finalized features are incompatible with the provided
+   * supported features.
+   *
+   * @param supportedFeatures   The supported features to be compared
+   * @param finalizedFeatures   The finalized features to be compared
+   *
+   * @return                    - True if there are any feature incompatibilities found.
+   *                            - False otherwise.
+   */
+  def hasIncompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
+                              finalizedFeatures: Features[FinalizedVersionRange]): Boolean = {
+    !incompatibleFeatures(supportedFeatures, finalizedFeatures, false).empty
+  }
+
+  private def incompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
+                                   finalizedFeatures: Features[FinalizedVersionRange],
+                                   logIncompatibilities: Boolean): Features[FinalizedVersionRange] = {
+    val incompatibilities = finalizedFeatures.features.asScala.map {
+      case (feature, versionLevels) => {
+        val supportedVersions = supportedFeatures.get(feature)
+        if (supportedVersions == null) {
+          (feature, versionLevels, "{feature=%s, reason='Unsupported feature'}".format(feature))
+        } else if (versionLevels.isIncompatibleWith(supportedVersions)) {
+          (feature, versionLevels, "{feature=%s, reason='%s is incompatible with %s'}".format(
+            feature, versionLevels, supportedVersions))
+        } else {
+          (feature, versionLevels, null)
+        }
+      }
+    }.filter{ case(_, _, errorReason) => errorReason != null}.toList
+
+    if (logIncompatibilities && incompatibilities.nonEmpty) {
+      warn(
+        "Feature incompatibilities seen: " + incompatibilities.map{
+          case(_, _, errorReason) => errorReason })
+    }
+    Features.finalizedFeatures(incompatibilities.map{
+      case(feature, versionLevels, _) => (feature, versionLevels) }.toMap.asJava)
+  }
+
+  /**
+   * A check that ensures each feature defined with min version level is a supported feature, and
+   * the min version level value is valid (i.e. it is compatible with the supported version range).
+   *
+   * @param supportedFeatures         the supported features
+   * @param featureMinVersionLevels   the feature minimum version levels
+   *
+   * @return                          - true, if the above described check passes.
+   *                                  - false, otherwise.
+   */
+  private def areFeatureMinVersionLevelsCompatible(
+    supportedFeatures: Features[SupportedVersionRange],
+    featureMinVersionLevels: Map[String, Short]
+  ): Boolean = {
+    featureMinVersionLevels.forall {
+      case(featureName, minVersionLevel) => {

Review comment:
       redundant {}

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FinalizedFeatureUpdate.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Set;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesRequestData;
+
+/**
+ * Encapsulates details about an update to a finalized feature. This is particularly useful to
+ * define each feature update in the
+ * {@link Admin#updateFinalizedFeatures(Set, UpdateFinalizedFeaturesOptions)} API request.
+ */
+public class FinalizedFeatureUpdate {
+    private final String featureName;
+    private final short maxVersionLevel;
+    private final boolean allowDowngrade;
+
+    /**
+     * @param featureName       the name of the finalized feature to be updated.
+     * @param maxVersionLevel   the new maximum version level for the finalized feature.
+     *                          a value < 1 is special and indicates that the update is intended to
+     *                          delete the finalized feature, and should be accompanied by setting
+     *                          the allowDowngrade flag to true.
+     * @param allowDowngrade    - true, if this feature update was meant to downgrade the existing
+     *                            maximum version level of the finalized feature.
+     *                          - false, otherwise.
+     */
+    public FinalizedFeatureUpdate(

Review comment:
       nit: one parameter each line, with the first parameter on the same line as constructor name.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any
+     * broker, but it can be optionally directed only to the controller via DescribeFeaturesOptions
+     * parameter.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the DescribeFeaturesResult containing the result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. The API is atomic, meaning that if a single
+     * feature update in the request can't succeed on the controller, then none of the feature
+     * updates are carried out. This request is issued only to the controller since the API is
+     * only served by the controller.
+     * <p>
+     * The API takes as input a set of FinalizedFeatureUpdate that need to be applied. Each such
+     * update specifies the finalized feature to be added or updated or deleted, along with the new
+     * max feature version level value.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
+     * in the controller if the feature update has the allowDowngrade flag set - setting this flag
+     * conveys user intent to attempt downgrade of a feature max version level. Note that despite
+     * the allowDowngrade flag being set, certain downgrades may be rejected by the controller if it
+     * is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular operation/intent. It is allowed

Review comment:
       For the entire sentence, I assume you want to say something like
   ```
   It could be done by turning on the allowDowngrade flag and setting the max version level to be less than 1
   ```

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,158 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * Enables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with enabled status. This status means the feature versioning system
+   * (KIP-584) is enabled, and, the finalized features stored in the FeatureZNode are active. This
+   * status should be written by the controller to the FeatureZNode only when the broker IBP config
+   * is greater than or equal to KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    For a new Kafka cluster (i.e. it is deployed first time), we would like to start the cluster
+   *    with all the possible supported features finalized immediately. The new cluster will almost
+   *    never be started with an old IBP config that’s less than KAFKA_2_7_IV0. Assuming this is the
+   *    case, then here is how we it: the controller will start up and notice that the FeatureZNode

Review comment:
       `then here is how we it` could be removed:
   `Assuming this is the case, then the controller...`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2945,6 +2950,137 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFinalizedFeatures(request: RequestChannel.Request): Unit = {
+    val updateFinalizedFeaturesRequest = request.body[UpdateFinalizedFeaturesRequest]
+    def sendResponseCallback(error: Errors, msgOverride: Option[String]): Unit = {
+      sendResponseExemptThrottle(request, new UpdateFinalizedFeaturesResponse(
+        new UpdateFinalizedFeaturesResponseData()
+          .setErrorCode(error.code())
+          .setErrorMessage(msgOverride.getOrElse(error.message()))))
+    }
+
+    if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
+      sendResponseCallback(Errors.CLUSTER_AUTHORIZATION_FAILED, Option.empty)
+    } else if (!controller.isActive) {
+      sendResponseCallback(Errors.NOT_CONTROLLER, Option.empty)
+    } else if (!config.isFeatureVersioningEnabled) {
+      sendResponseCallback(Errors.INVALID_REQUEST, Some("Feature versioning system is disabled."))
+    } else {
+      val targetFeaturesOrError = getTargetFinalizedFeaturesOrError(updateFinalizedFeaturesRequest)
+      targetFeaturesOrError match {
+        case Left(targetFeatures) =>
+          controller.updateFinalizedFeatures(targetFeatures, sendResponseCallback)
+        case Right(error) =>
+          sendResponseCallback(error.error, Some(error.message))
+      }
+    }
+  }
+
+  /**
+   * Validates the provided UpdateFinalizedFeaturesRequest, checking for various error cases.
+   * If the validation is successful, returns the target finalized features constructed from the
+   * request.
+   *
+   * @param request   the request to be validated
+   *
+   * @return          - the target finalized features, if request validation is successful
+   *                  - an ApiError if request validation fails
+   */
+  def getTargetFinalizedFeaturesOrError(request: UpdateFinalizedFeaturesRequest):
+    Either[Features[FinalizedVersionRange], ApiError] = {
+    val updates = request.data.finalizedFeatureUpdates
+    if (updates.isEmpty) {
+      return Right(new ApiError(Errors.INVALID_REQUEST,
+        "Can not provide empty FinalizedFeatureUpdates in the request."))
+    }
+
+    val latestFeatures = featureCache.get
+    val newFeatures = scala.collection.mutable.Map[String, FinalizedVersionRange]()
+    updates.asScala.foreach(
+      update => {
+        // Rule #1) Check that the feature name is not empty.
+        if (update.name.isEmpty) {

Review comment:
       Commonly in scala we try to avoid using return, consider using `if-else` instead.

##########
File path: clients/src/main/resources/common/message/UpdateFinalizedFeaturesResponse.json
##########
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFinalizedFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The error code or `0` if there was no error." },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",

Review comment:
       This is a bit unique, since we should commonly rely on the error code to propagate information instead of a message which has unbounded size. Could you explain why we couldn't simply re-invent a new error code if existing ones are not sufficient?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any

Review comment:
       We should suggest in what circumstances a user may require sending the request directly to the controller, to me if there is a case where user wants stronger consistency.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2945,6 +2950,137 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFinalizedFeatures(request: RequestChannel.Request): Unit = {
+    val updateFinalizedFeaturesRequest = request.body[UpdateFinalizedFeaturesRequest]
+    def sendResponseCallback(error: Errors, msgOverride: Option[String]): Unit = {
+      sendResponseExemptThrottle(request, new UpdateFinalizedFeaturesResponse(
+        new UpdateFinalizedFeaturesResponseData()
+          .setErrorCode(error.code())
+          .setErrorMessage(msgOverride.getOrElse(error.message()))))

Review comment:
       We don't need to include the same error information twice, as the client side will recognize anyway.

##########
File path: core/src/test/scala/unit/kafka/server/UpdateFinalizedFeaturesTest.scala
##########
@@ -0,0 +1,450 @@
+package kafka.server

Review comment:
       missing header

##########
File path: clients/src/main/resources/common/message/UpdateFinalizedFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFinalizedFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FinalizedFeatureUpdates", "type": "[]FinalizedFeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Name", "type":  "string", "versions":  "0+", "mapKey": true,
+        "about": "The name of the finalized feature to be updated."},
+      {"name":  "MaxVersionLevel", "type":  "int16", "versions":  "0+",

Review comment:
       space

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -82,18 +108,55 @@ object FinalizedFeatureCache extends Logging {
         " The existing cache contents are %s").format(latest, oldFeatureAndEpoch)
       throw new FeatureCacheUpdateException(errorMsg)
     } else {
-      val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features)
+      val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features)
       if (!incompatibleFeatures.empty) {
         val errorMsg = ("FinalizedFeatureCache update failed since feature compatibility" +
           " checks failed! Supported %s has incompatibilities with the latest %s."
-          ).format(SupportedFeatures.get, latest)
+          ).format(brokerFeatures.supportedFeatures, latest)
         throw new FeatureCacheUpdateException(errorMsg)
       } else {
-        val logMsg = "Updated cache from existing finalized %s to latest finalized %s".format(
+        val logMsg = "Updated cache from existing %s to latest %s".format(
           oldFeatureAndEpoch, latest)
-        featuresAndEpoch = Some(latest)
+        synchronized {
+          featuresAndEpoch = Some(latest)
+          notifyAll()
+        }
         info(logMsg)
       }
     }
   }
+
+  /**
+   * Causes the current thread to wait no more than timeoutMs for the specified condition to be met.
+   * It is guaranteed that the provided condition will always be invoked only from within a
+   * synchronized block.
+   *
+   * @param waitCondition   the condition to be waited upon:
+   *                         - if the condition returns true, then, the wait will stop.
+   *                         - if the condition returns false, it means the wait must continue until
+   *                           timeout.
+   *
+   * @param timeoutMs       the timeout (in milli seconds)
+   *
+   * @throws                TimeoutException if the condition is not met within timeoutMs.
+   */
+  private def waitUntilConditionOrThrow(waitCondition: () => Boolean, timeoutMs: Long): Unit = {
+    if(timeoutMs < 0L) {
+      throw new IllegalArgumentException(s"Expected timeoutMs >= 0, but $timeoutMs was provided.")
+    }
+    synchronized {
+      var sleptTimeMs = 0L
+      while (!waitCondition()) {
+        val timeoutLeftMs = timeoutMs - sleptTimeMs
+        if (timeoutLeftMs <= 0) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${timeoutMs}ms for required condition to be met." +
+              s" Current epoch: ${featuresAndEpoch.map(fe => fe.epoch).getOrElse("<none>")}.")
+        }
+        val timeBeforeNanos = System.nanoTime

Review comment:
       Do we need this precision of exact wait time? Could we just track the function start time and compare with current system time for expiration?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -177,15 +214,33 @@ public static ApiVersionsResponse createApiVersionsResponse(
             }
         }
 
+        return new ApiVersionsResponse(
+            createApiVersionsResponseData(
+                throttleTimeMs,
+                Errors.NONE,
+                apiKeys,
+                latestSupportedFeatures,
+                finalizedFeatures,
+                finalizedFeaturesEpoch));
+    }
+
+    public static ApiVersionsResponseData createApiVersionsResponseData(

Review comment:
       Seems not necessary to have this helper as it doesn't reduce the code length.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,158 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * Enables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with enabled status. This status means the feature versioning system
+   * (KIP-584) is enabled, and, the finalized features stored in the FeatureZNode are active. This
+   * status should be written by the controller to the FeatureZNode only when the broker IBP config
+   * is greater than or equal to KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    For a new Kafka cluster (i.e. it is deployed first time), we would like to start the cluster
+   *    with all the possible supported features finalized immediately. The new cluster will almost
+   *    never be started with an old IBP config that’s less than KAFKA_2_7_IV0. Assuming this is the
+   *    case, then here is how we it: the controller will start up and notice that the FeatureZNode
+   *    is absent in the new cluster, it will then create a FeatureZNode (with enabled status)
+   *    containing the entire list of default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the
+   *    Broker binary has been upgraded to a newer version that supports the feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier version of the Broker
+   *    binary. In this case, we want to start with no finalized features and allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the
+   *    features. The reason to do this is that enabling all the possible features immediately after
+   *    an upgrade could be harmful to the cluster.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. If absent, then it
+   *        will react by creating a FeatureZNode with disabled status and empty finalized features.
+   *        Otherwise, if a node already exists in enabled status then the controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already present with enabled
+   *    status and existing finalized features. In such a case, the controller needs to scan the
+   *    existing finalized features and mutate them for the purpose of version level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is present in the default
+   *    finalized features, then, it's existing minimum version level is updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range of feature version
+   *    levels deprecated are from the closed range: [existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, which requires a major
+   *    release of Kafka. In such a release, the minimum version level maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map {
+          case (featureName, existingVersionRange) => {
+            val updatedVersionRange = defaultFinalizedFeatures.get(featureName)
+            if (updatedVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with $existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (existingVersionRange.max() >= updatedVersionRange.min()) {

Review comment:
       What about the case where `existingVersionRange.min() > updatedVersionRange.max()` is true? For example:
   ```
   existing: [4, 5]
   updated: [1, 2]
   ```
   Are we enabling version 3 as well?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2945,6 +2950,137 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFinalizedFeatures(request: RequestChannel.Request): Unit = {
+    val updateFinalizedFeaturesRequest = request.body[UpdateFinalizedFeaturesRequest]
+    def sendResponseCallback(error: Errors, msgOverride: Option[String]): Unit = {
+      sendResponseExemptThrottle(request, new UpdateFinalizedFeaturesResponse(
+        new UpdateFinalizedFeaturesResponseData()
+          .setErrorCode(error.code())
+          .setErrorMessage(msgOverride.getOrElse(error.message()))))
+    }
+
+    if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
+      sendResponseCallback(Errors.CLUSTER_AUTHORIZATION_FAILED, Option.empty)
+    } else if (!controller.isActive) {
+      sendResponseCallback(Errors.NOT_CONTROLLER, Option.empty)
+    } else if (!config.isFeatureVersioningEnabled) {
+      sendResponseCallback(Errors.INVALID_REQUEST, Some("Feature versioning system is disabled."))
+    } else {
+      val targetFeaturesOrError = getTargetFinalizedFeaturesOrError(updateFinalizedFeaturesRequest)
+      targetFeaturesOrError match {
+        case Left(targetFeatures) =>
+          controller.updateFinalizedFeatures(targetFeatures, sendResponseCallback)
+        case Right(error) =>
+          sendResponseCallback(error.error, Some(error.message))
+      }
+    }
+  }
+
+  /**
+   * Validates the provided UpdateFinalizedFeaturesRequest, checking for various error cases.
+   * If the validation is successful, returns the target finalized features constructed from the
+   * request.
+   *
+   * @param request   the request to be validated
+   *
+   * @return          - the target finalized features, if request validation is successful
+   *                  - an ApiError if request validation fails
+   */
+  def getTargetFinalizedFeaturesOrError(request: UpdateFinalizedFeaturesRequest):
+    Either[Features[FinalizedVersionRange], ApiError] = {
+    val updates = request.data.finalizedFeatureUpdates
+    if (updates.isEmpty) {
+      return Right(new ApiError(Errors.INVALID_REQUEST,
+        "Can not provide empty FinalizedFeatureUpdates in the request."))
+    }
+
+    val latestFeatures = featureCache.get
+    val newFeatures = scala.collection.mutable.Map[String, FinalizedVersionRange]()
+    updates.asScala.foreach(
+      update => {
+        // Rule #1) Check that the feature name is not empty.
+        if (update.name.isEmpty) {
+          return Right(
+            new ApiError(Errors.INVALID_REQUEST,
+              "Can not contain empty feature name in the request."))
+        }
+
+        val cacheEntry = latestFeatures.map(lf => lf.features.get(update.name)).orNull
+
+        // We handle deletion requests separately from non-deletion requests.
+        if (UpdateFinalizedFeaturesRequest.isDeleteRequest(update)) { // Deletion request
+          // Rule #2) Disallow deletion of a finalized feature without allowDowngrade flag set.
+          if (!update.allowDowngrade) {
+            return Right(
+              new ApiError(Errors.INVALID_REQUEST,
+                s"Can not delete feature: '${update.name}' without setting the" +
+                  " allowDowngrade flag in the request."))

Review comment:
       `setting the allowDowngrade flag to true in the request`

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map enables feature
+ *    version level deprecation. This is how it works: in order to deprecate feature version levels,
+ *    in this map the default minimum version level of a feature can be set to a new value that's
+ *    higher than 1 (let's call this new_min_version_level). In doing so, the feature version levels
+ *    in the closed range: [1, latest_min_version_level - 1] get deprecated by the controller logic
+ *    that applies this map to persistent finalized feature state in ZK (this mutation happens
+ *    during controller election and during finalized feature updates via the
+ *    APIKeys.UPDATE_FINALIZED_FEATURES api).
+ *
+ * This class also provides APIs to check for incompatibilities between the features supported by
+ * the Broker and finalized features.
+ *
+ * NOTE: the update*() and clear*() APIs of this class should be used only for testing purposes.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange],
+                              @volatile var defaultFeatureMinVersionLevels: Map[String, Short]) {
+  require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+    supportedFeatures, defaultFeatureMinVersionLevels))
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, defaultFeatureMinVersionLevels))
+    supportedFeatures = newFeatures
+  }
+
+  /**
+   * Returns the default minimum version level for a specific feature.
+   *
+   * @param feature   the name of the feature
+   *
+   * @return          the default minimum version level for the feature if its defined.
+   *                  otherwise, returns 1.
+   */
+  def defaultMinVersionLevel(feature: String): Short = {
+    defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
+  // For testing only.
+  def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, newMinVersionLevels))
+    defaultFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  /**
+   * Returns the default finalized features that a new Kafka cluster with IBP config >= KAFKA_2_7_IV0
+   * needs to be bootstrapped with.
+   */
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+    Features.finalizedFeatures(
+      supportedFeatures.features.asScala.map {
+        case(name, versionRange) => (
+          name, new FinalizedVersionRange(defaultMinVersionLevel(name), versionRange.max))
+      }.asJava)
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the
+   *     supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities need to be checked for.
+   *
+   * @return            The subset of input features which are incompatible. If the returned object
+   *                    is empty, it means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = {
+    BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, true)
+  }
+}
+
+object BrokerFeatures extends Logging {
+
+  def createDefault(): BrokerFeatures = {
+    // The arguments are currently empty, but, in the future as we define features we should
+    // populate the required values here.
+    new BrokerFeatures(emptySupportedFeatures, Map[String, Short]())
+  }
+
+  /**
+   * Returns true if any of the provided finalized features are incompatible with the provided
+   * supported features.
+   *
+   * @param supportedFeatures   The supported features to be compared
+   * @param finalizedFeatures   The finalized features to be compared
+   *
+   * @return                    - True if there are any feature incompatibilities found.
+   *                            - False otherwise.
+   */
+  def hasIncompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
+                              finalizedFeatures: Features[FinalizedVersionRange]): Boolean = {
+    !incompatibleFeatures(supportedFeatures, finalizedFeatures, false).empty
+  }
+
+  private def incompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
+                                   finalizedFeatures: Features[FinalizedVersionRange],
+                                   logIncompatibilities: Boolean): Features[FinalizedVersionRange] = {
+    val incompatibilities = finalizedFeatures.features.asScala.map {
+      case (feature, versionLevels) => {

Review comment:
       redundant {}

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2945,6 +2950,137 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFinalizedFeatures(request: RequestChannel.Request): Unit = {
+    val updateFinalizedFeaturesRequest = request.body[UpdateFinalizedFeaturesRequest]
+    def sendResponseCallback(error: Errors, msgOverride: Option[String]): Unit = {
+      sendResponseExemptThrottle(request, new UpdateFinalizedFeaturesResponse(
+        new UpdateFinalizedFeaturesResponseData()
+          .setErrorCode(error.code())
+          .setErrorMessage(msgOverride.getOrElse(error.message()))))
+    }
+
+    if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
+      sendResponseCallback(Errors.CLUSTER_AUTHORIZATION_FAILED, Option.empty)
+    } else if (!controller.isActive) {
+      sendResponseCallback(Errors.NOT_CONTROLLER, Option.empty)
+    } else if (!config.isFeatureVersioningEnabled) {
+      sendResponseCallback(Errors.INVALID_REQUEST, Some("Feature versioning system is disabled."))
+    } else {
+      val targetFeaturesOrError = getTargetFinalizedFeaturesOrError(updateFinalizedFeaturesRequest)
+      targetFeaturesOrError match {
+        case Left(targetFeatures) =>
+          controller.updateFinalizedFeatures(targetFeatures, sendResponseCallback)
+        case Right(error) =>
+          sendResponseCallback(error.error, Some(error.message))
+      }
+    }
+  }
+
+  /**
+   * Validates the provided UpdateFinalizedFeaturesRequest, checking for various error cases.
+   * If the validation is successful, returns the target finalized features constructed from the
+   * request.
+   *
+   * @param request   the request to be validated
+   *
+   * @return          - the target finalized features, if request validation is successful
+   *                  - an ApiError if request validation fails
+   */
+  def getTargetFinalizedFeaturesOrError(request: UpdateFinalizedFeaturesRequest):
+    Either[Features[FinalizedVersionRange], ApiError] = {
+    val updates = request.data.finalizedFeatureUpdates
+    if (updates.isEmpty) {
+      return Right(new ApiError(Errors.INVALID_REQUEST,
+        "Can not provide empty FinalizedFeatureUpdates in the request."))
+    }
+
+    val latestFeatures = featureCache.get
+    val newFeatures = scala.collection.mutable.Map[String, FinalizedVersionRange]()
+    updates.asScala.foreach(
+      update => {
+        // Rule #1) Check that the feature name is not empty.
+        if (update.name.isEmpty) {
+          return Right(
+            new ApiError(Errors.INVALID_REQUEST,
+              "Can not contain empty feature name in the request."))
+        }
+
+        val cacheEntry = latestFeatures.map(lf => lf.features.get(update.name)).orNull
+
+        // We handle deletion requests separately from non-deletion requests.
+        if (UpdateFinalizedFeaturesRequest.isDeleteRequest(update)) { // Deletion request

Review comment:
       nit: the comment seems unnecessary on L3011

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any
+     * broker, but it can be optionally directed only to the controller via DescribeFeaturesOptions
+     * parameter.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the DescribeFeaturesResult containing the result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. The API is atomic, meaning that if a single
+     * feature update in the request can't succeed on the controller, then none of the feature
+     * updates are carried out. This request is issued only to the controller since the API is
+     * only served by the controller.
+     * <p>
+     * The API takes as input a set of FinalizedFeatureUpdate that need to be applied. Each such
+     * update specifies the finalized feature to be added or updated or deleted, along with the new
+     * max feature version level value.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
+     * in the controller if the feature update has the allowDowngrade flag set - setting this flag
+     * conveys user intent to attempt downgrade of a feature max version level. Note that despite
+     * the allowDowngrade flag being set, certain downgrades may be rejected by the controller if it
+     * is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular operation/intent. It is allowed
+     * only if the allowDowngrade flag is set in the feature update, and, if the max version level
+     * is set to a value less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures
+     * obtained from the returned {@link UpdateFinalizedFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a non-existing finalized feature is attempted
+     *   to be deleted or downgraded.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the updates could finish. It cannot be guaranteed whether
+     *   the updates succeeded or not.</li>
+     *   <li>{@link org.apache.kafka.common.errors.FinalizedFeatureUpdateFailedException}
+     *   If the updates could not be applied on the controller, despite the request being valid.
+     *   This may be a temporary problem.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+
+     * @param featureUpdates   the set of finalized feature updates
+     * @param options          the options to use
+     *
+     * @return                 the UpdateFinalizedFeaturesResult containing the result
+     */
+    UpdateFinalizedFeaturesResult updateFinalizedFeatures(

Review comment:
       The definition seems not aligned with the KIP which states `updateFeatures`, do you think it's necessary to mention `finalized` in all the function signatures?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2945,6 +2950,137 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFinalizedFeatures(request: RequestChannel.Request): Unit = {
+    val updateFinalizedFeaturesRequest = request.body[UpdateFinalizedFeaturesRequest]
+    def sendResponseCallback(error: Errors, msgOverride: Option[String]): Unit = {
+      sendResponseExemptThrottle(request, new UpdateFinalizedFeaturesResponse(
+        new UpdateFinalizedFeaturesResponseData()
+          .setErrorCode(error.code())
+          .setErrorMessage(msgOverride.getOrElse(error.message()))))
+    }
+
+    if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
+      sendResponseCallback(Errors.CLUSTER_AUTHORIZATION_FAILED, Option.empty)
+    } else if (!controller.isActive) {
+      sendResponseCallback(Errors.NOT_CONTROLLER, Option.empty)
+    } else if (!config.isFeatureVersioningEnabled) {
+      sendResponseCallback(Errors.INVALID_REQUEST, Some("Feature versioning system is disabled."))
+    } else {
+      val targetFeaturesOrError = getTargetFinalizedFeaturesOrError(updateFinalizedFeaturesRequest)
+      targetFeaturesOrError match {
+        case Left(targetFeatures) =>
+          controller.updateFinalizedFeatures(targetFeatures, sendResponseCallback)
+        case Right(error) =>
+          sendResponseCallback(error.error, Some(error.message))
+      }
+    }
+  }
+
+  /**
+   * Validates the provided UpdateFinalizedFeaturesRequest, checking for various error cases.
+   * If the validation is successful, returns the target finalized features constructed from the
+   * request.
+   *
+   * @param request   the request to be validated
+   *
+   * @return          - the target finalized features, if request validation is successful
+   *                  - an ApiError if request validation fails
+   */
+  def getTargetFinalizedFeaturesOrError(request: UpdateFinalizedFeaturesRequest):

Review comment:
       access should be private

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any
+     * broker, but it can be optionally directed only to the controller via DescribeFeaturesOptions
+     * parameter.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the DescribeFeaturesResult containing the result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. The API is atomic, meaning that if a single
+     * feature update in the request can't succeed on the controller, then none of the feature
+     * updates are carried out. This request is issued only to the controller since the API is
+     * only served by the controller.
+     * <p>
+     * The API takes as input a set of FinalizedFeatureUpdate that need to be applied. Each such
+     * update specifies the finalized feature to be added or updated or deleted, along with the new
+     * max feature version level value.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
+     * in the controller if the feature update has the allowDowngrade flag set - setting this flag
+     * conveys user intent to attempt downgrade of a feature max version level. Note that despite
+     * the allowDowngrade flag being set, certain downgrades may be rejected by the controller if it
+     * is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular operation/intent. It is allowed
+     * only if the allowDowngrade flag is set in the feature update, and, if the max version level
+     * is set to a value less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures
+     * obtained from the returned {@link UpdateFinalizedFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a non-existing finalized feature is attempted
+     *   to be deleted or downgraded.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the updates could finish. It cannot be guaranteed whether
+     *   the updates succeeded or not.</li>
+     *   <li>{@link org.apache.kafka.common.errors.FinalizedFeatureUpdateFailedException}
+     *   If the updates could not be applied on the controller, despite the request being valid.
+     *   This may be a temporary problem.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+
+     * @param featureUpdates   the set of finalized feature updates
+     * @param options          the options to use
+     *
+     * @return                 the UpdateFinalizedFeaturesResult containing the result
+     */
+    UpdateFinalizedFeaturesResult updateFinalizedFeatures(

Review comment:
       nit: put first parameter on this line.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/UpdateFinalizedFeaturesResponse.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class UpdateFinalizedFeaturesResponse extends AbstractResponse {

Review comment:
       Would be good to redundantly copy over the expected error codes from `Admin.java` definition, similar to other response class such as `OffsetCommitResponse`

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1825,36 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def processUpdateFinalizedFeatures(newFeatures: Features[FinalizedVersionRange],
+                                             callback: UpdateFinalizedFeaturesCallback): Unit = {
+    if (isActive) {
+      val incompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.filter(broker => {
+        BrokerFeatures.hasIncompatibleFeatures(broker.features, newFeatures)
+      })
+      if (incompatibleBrokers.size > 0) {
+        callback(
+          Errors.INVALID_REQUEST,
+          Some(
+            s"Could not apply finalized feature updates because ${incompatibleBrokers.size} brokers"
+            + s" were found to have incompatible features. newFeatures: $newFeatures"
+           + s", incompatibleBrokers: $incompatibleBrokers."))
+      } else {
+        try {
+          val newVersion = zkClient.updateFeatureZNode(
+            new FeatureZNode(FeatureZNodeStatus.Enabled, newFeatures))
+          featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)

Review comment:
       Could we avoid blocking controller processing here, by putting the callback into a delayed queue or sth?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3984,6 +3988,108 @@ void handleFailure(Throwable throwable) {
         return new AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        Call callViaLeastLoadedNode = new Call("describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else {
+                    future.completeExceptionally(
+                        Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), throwable);
+            }
+        };
+
+        Call call = callViaLeastLoadedNode;
+        if (options.sendRequestToController()) {
+            call = new Call("describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {
+
+                @Override
+                ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                    return (ApiVersionsRequest.Builder) callViaLeastLoadedNode.createRequest(timeoutMs);
+                }
+
+                @Override
+                void handleResponse(AbstractResponse response) {
+                    final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
+                    if (apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                        handleNotControllerError(Errors.NOT_CONTROLLER);
+                    } else {
+                        callViaLeastLoadedNode.handleResponse(response);

Review comment:
       It looks weird to complete `callViaLeastLoadedNode` in a controller response handler. I'm inclined to increase a bit on the code duplication, based on `if (options.sendRequestToController())` to have two separate request traces like:
   ```
   if (options.sendRequestToController()) {
               ...
               runnable.call(callControllerNode, now);
           } else {
               ...
               runnable.call(callViaLeastLoadedNode, now);
           }
   ```
   and try to complete the same future.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,158 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * Enables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with enabled status. This status means the feature versioning system
+   * (KIP-584) is enabled, and, the finalized features stored in the FeatureZNode are active. This
+   * status should be written by the controller to the FeatureZNode only when the broker IBP config
+   * is greater than or equal to KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    For a new Kafka cluster (i.e. it is deployed first time), we would like to start the cluster
+   *    with all the possible supported features finalized immediately. The new cluster will almost
+   *    never be started with an old IBP config that’s less than KAFKA_2_7_IV0. Assuming this is the
+   *    case, then here is how we it: the controller will start up and notice that the FeatureZNode
+   *    is absent in the new cluster, it will then create a FeatureZNode (with enabled status)
+   *    containing the entire list of default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the
+   *    Broker binary has been upgraded to a newer version that supports the feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier version of the Broker
+   *    binary. In this case, we want to start with no finalized features and allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the
+   *    features. The reason to do this is that enabling all the possible features immediately after
+   *    an upgrade could be harmful to the cluster.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. If absent, then it
+   *        will react by creating a FeatureZNode with disabled status and empty finalized features.
+   *        Otherwise, if a node already exists in enabled status then the controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already present with enabled
+   *    status and existing finalized features. In such a case, the controller needs to scan the
+   *    existing finalized features and mutate them for the purpose of version level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is present in the default
+   *    finalized features, then, it's existing minimum version level is updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range of feature version
+   *    levels deprecated are from the closed range: [existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, which requires a major
+   *    release of Kafka. In such a release, the minimum version level maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map {
+          case (featureName, existingVersionRange) => {
+            val updatedVersionRange = defaultFinalizedFeatures.get(featureName)

Review comment:
       If this is broker required feature set, I feel we could name it something like `brokerRequiredVersionRange`. `Updated` sounds a bit blur for reader, as it couldn't infer the subject.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1825,36 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def processUpdateFinalizedFeatures(newFeatures: Features[FinalizedVersionRange],
+                                             callback: UpdateFinalizedFeaturesCallback): Unit = {
+    if (isActive) {
+      val incompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.filter(broker => {
+        BrokerFeatures.hasIncompatibleFeatures(broker.features, newFeatures)
+      })
+      if (incompatibleBrokers.size > 0) {

Review comment:
       Replace with `nonEmpty`

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map enables feature
+ *    version level deprecation. This is how it works: in order to deprecate feature version levels,
+ *    in this map the default minimum version level of a feature can be set to a new value that's
+ *    higher than 1 (let's call this new_min_version_level). In doing so, the feature version levels
+ *    in the closed range: [1, latest_min_version_level - 1] get deprecated by the controller logic
+ *    that applies this map to persistent finalized feature state in ZK (this mutation happens
+ *    during controller election and during finalized feature updates via the
+ *    APIKeys.UPDATE_FINALIZED_FEATURES api).
+ *
+ * This class also provides APIs to check for incompatibilities between the features supported by
+ * the Broker and finalized features.
+ *
+ * NOTE: the update*() and clear*() APIs of this class should be used only for testing purposes.

Review comment:
       We only need to mark testing only comment on the functions 

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,158 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * Enables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with enabled status. This status means the feature versioning system
+   * (KIP-584) is enabled, and, the finalized features stored in the FeatureZNode are active. This
+   * status should be written by the controller to the FeatureZNode only when the broker IBP config
+   * is greater than or equal to KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    For a new Kafka cluster (i.e. it is deployed first time), we would like to start the cluster
+   *    with all the possible supported features finalized immediately. The new cluster will almost

Review comment:
       `For a new Kafka cluster (i.e. it is deployed first time), we would like to start the cluster with all the possible supported features finalized immediately.`
   I think this comment is hard to understand if reader has zero context on the feature versioning. It would be good to include a short explanation on what does a `supported feature` mean, and what it means to be `finalized`.
   `The new cluster will almost never be started with an old IBP config that’s less than KAFKA_2_7_IV0.`
   This sentence is positioned awkwardly. I would suggest we just propose `As a new cluster starting with IBP setting equal to or greater than KAFKA_2_7_IV0`

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map enables feature
+ *    version level deprecation. This is how it works: in order to deprecate feature version levels,
+ *    in this map the default minimum version level of a feature can be set to a new value that's
+ *    higher than 1 (let's call this new_min_version_level). In doing so, the feature version levels
+ *    in the closed range: [1, latest_min_version_level - 1] get deprecated by the controller logic
+ *    that applies this map to persistent finalized feature state in ZK (this mutation happens
+ *    during controller election and during finalized feature updates via the
+ *    APIKeys.UPDATE_FINALIZED_FEATURES api).
+ *
+ * This class also provides APIs to check for incompatibilities between the features supported by
+ * the Broker and finalized features.
+ *
+ * NOTE: the update*() and clear*() APIs of this class should be used only for testing purposes.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange],
+                              @volatile var defaultFeatureMinVersionLevels: Map[String, Short]) {
+  require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+    supportedFeatures, defaultFeatureMinVersionLevels))
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, defaultFeatureMinVersionLevels))
+    supportedFeatures = newFeatures
+  }
+
+  /**
+   * Returns the default minimum version level for a specific feature.
+   *
+   * @param feature   the name of the feature
+   *
+   * @return          the default minimum version level for the feature if its defined.
+   *                  otherwise, returns 1.
+   */
+  def defaultMinVersionLevel(feature: String): Short = {
+    defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
+  // For testing only.
+  def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, newMinVersionLevels))
+    defaultFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  /**
+   * Returns the default finalized features that a new Kafka cluster with IBP config >= KAFKA_2_7_IV0
+   * needs to be bootstrapped with.
+   */
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+    Features.finalizedFeatures(
+      supportedFeatures.features.asScala.map {
+        case(name, versionRange) => (
+          name, new FinalizedVersionRange(defaultMinVersionLevel(name), versionRange.max))
+      }.asJava)
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the
+   *     supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities need to be checked for.
+   *
+   * @return            The subset of input features which are incompatible. If the returned object
+   *                    is empty, it means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = {
+    BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, true)
+  }
+}
+
+object BrokerFeatures extends Logging {
+
+  def createDefault(): BrokerFeatures = {
+    // The arguments are currently empty, but, in the future as we define features we should
+    // populate the required values here.
+    new BrokerFeatures(emptySupportedFeatures, Map[String, Short]())
+  }
+
+  /**
+   * Returns true if any of the provided finalized features are incompatible with the provided
+   * supported features.
+   *
+   * @param supportedFeatures   The supported features to be compared
+   * @param finalizedFeatures   The finalized features to be compared
+   *
+   * @return                    - True if there are any feature incompatibilities found.
+   *                            - False otherwise.
+   */
+  def hasIncompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
+                              finalizedFeatures: Features[FinalizedVersionRange]): Boolean = {
+    !incompatibleFeatures(supportedFeatures, finalizedFeatures, false).empty
+  }
+
+  private def incompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
+                                   finalizedFeatures: Features[FinalizedVersionRange],
+                                   logIncompatibilities: Boolean): Features[FinalizedVersionRange] = {
+    val incompatibilities = finalizedFeatures.features.asScala.map {

Review comment:
       `incompatibleFeatures`?

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map enables feature
+ *    version level deprecation. This is how it works: in order to deprecate feature version levels,
+ *    in this map the default minimum version level of a feature can be set to a new value that's
+ *    higher than 1 (let's call this new_min_version_level). In doing so, the feature version levels
+ *    in the closed range: [1, latest_min_version_level - 1] get deprecated by the controller logic
+ *    that applies this map to persistent finalized feature state in ZK (this mutation happens
+ *    during controller election and during finalized feature updates via the
+ *    APIKeys.UPDATE_FINALIZED_FEATURES api).
+ *
+ * This class also provides APIs to check for incompatibilities between the features supported by
+ * the Broker and finalized features.
+ *
+ * NOTE: the update*() and clear*() APIs of this class should be used only for testing purposes.
+ */
+class BrokerFeatures private (@volatile var supportedFeatures: Features[SupportedVersionRange],
+                              @volatile var defaultFeatureMinVersionLevels: Map[String, Short]) {
+  require(BrokerFeatures.areFeatureMinVersionLevelsCompatible(
+    supportedFeatures, defaultFeatureMinVersionLevels))
+
+  // For testing only.
+  def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(newFeatures, defaultFeatureMinVersionLevels))
+    supportedFeatures = newFeatures
+  }
+
+  /**
+   * Returns the default minimum version level for a specific feature.
+   *
+   * @param feature   the name of the feature
+   *
+   * @return          the default minimum version level for the feature if its defined.
+   *                  otherwise, returns 1.
+   */
+  def defaultMinVersionLevel(feature: String): Short = {
+    defaultFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
+  // For testing only.
+  def setDefaultMinVersionLevels(newMinVersionLevels: Map[String, Short]): Unit = {
+    require(
+      BrokerFeatures.areFeatureMinVersionLevelsCompatible(supportedFeatures, newMinVersionLevels))
+    defaultFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  /**
+   * Returns the default finalized features that a new Kafka cluster with IBP config >= KAFKA_2_7_IV0
+   * needs to be bootstrapped with.
+   */
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {
+    Features.finalizedFeatures(
+      supportedFeatures.features.asScala.map {
+        case(name, versionRange) => (
+          name, new FinalizedVersionRange(defaultMinVersionLevel(name), versionRange.max))
+      }.asJava)
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the
+   *     supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities need to be checked for.
+   *
+   * @return            The subset of input features which are incompatible. If the returned object
+   *                    is empty, it means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = {
+    BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, true)
+  }
+}
+
+object BrokerFeatures extends Logging {
+
+  def createDefault(): BrokerFeatures = {
+    // The arguments are currently empty, but, in the future as we define features we should
+    // populate the required values here.
+    new BrokerFeatures(emptySupportedFeatures, Map[String, Short]())
+  }
+
+  /**
+   * Returns true if any of the provided finalized features are incompatible with the provided
+   * supported features.
+   *
+   * @param supportedFeatures   The supported features to be compared
+   * @param finalizedFeatures   The finalized features to be compared
+   *
+   * @return                    - True if there are any feature incompatibilities found.
+   *                            - False otherwise.
+   */
+  def hasIncompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
+                              finalizedFeatures: Features[FinalizedVersionRange]): Boolean = {
+    !incompatibleFeatures(supportedFeatures, finalizedFeatures, false).empty
+  }
+
+  private def incompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
+                                   finalizedFeatures: Features[FinalizedVersionRange],
+                                   logIncompatibilities: Boolean): Features[FinalizedVersionRange] = {
+    val incompatibilities = finalizedFeatures.features.asScala.map {
+      case (feature, versionLevels) => {
+        val supportedVersions = supportedFeatures.get(feature)
+        if (supportedVersions == null) {
+          (feature, versionLevels, "{feature=%s, reason='Unsupported feature'}".format(feature))
+        } else if (versionLevels.isIncompatibleWith(supportedVersions)) {
+          (feature, versionLevels, "{feature=%s, reason='%s is incompatible with %s'}".format(
+            feature, versionLevels, supportedVersions))
+        } else {
+          (feature, versionLevels, null)
+        }
+      }
+    }.filter{ case(_, _, errorReason) => errorReason != null}.toList
+
+    if (logIncompatibilities && incompatibilities.nonEmpty) {
+      warn(
+        "Feature incompatibilities seen: " + incompatibilities.map{

Review comment:
       nit: I have seen that we use both `map{` and `map {`, could we try using only one format consistently within the current file?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org