You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/10 16:34:14 UTC

[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r467279790



##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The name of the finalized feature to be updated."},
+      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+        "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."},
+      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
       The KIP wiki has AllowDowngrade at the topic level. Could we update that?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises
+   * support for. Each broker advertises the version ranges of its own supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a
+   * specified range of version levels. Also, the controller is the only entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all
+   *    the possible supported features finalized immediately. Assuming this is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the
+   *    broker binary has been upgraded to a newer version that supports the feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier version of the broker
+   *    binary. In this case, we want to start with no finalized features and allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the
+   *    features. This process ensures we do not enable all the possible features immediately after
+   *    an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. If absent, it will
+   *        react by creating a FeatureZNode with disabled status and empty finalized features.
+   *        Otherwise, if a node already exists in enabled status then the controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already present with enabled
+   *    status and existing finalized features. In such a case, the controller needs to scan the
+   *    existing finalized features and mutate them for the purpose of version level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is present in the default
+   *    finalized features, then, its existing minimum version level is updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range of feature version
+   *    levels deprecated are from the closed range: [existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, which requires a major
+   *    release of Kafka. In such a release, the minimum version level maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.getDefaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map {
+          case (featureName, existingVersionRange) =>
+            val brokerDefaultVersionRange = defaultFinalizedFeatures.get(featureName)
+            if (brokerDefaultVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with $existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (brokerDefaultVersionRange.max() >= existingVersionRange.max() &&
+                       brokerDefaultVersionRange.min() <= existingVersionRange.max()) {
+              // Using the change below, we deprecate all version levels in the range:
+              // [existingVersionRange.min(), brokerDefaultVersionRange.min() - 1].
+              //
+              // NOTE: if existingVersionRange.min() equals brokerDefaultVersionRange.min(), then
+              // we do not deprecate any version levels (since there is none to be deprecated).
+              //
+              // Examples:
+              // 1. brokerDefaultVersionRange = [4, 7] and existingVersionRange = [1, 5].
+              //    In this case, we deprecate all version levels in the range: [1, 3].
+              // 2. brokerDefaultVersionRange = [4, 7] and existingVersionRange = [4, 5].
+              //    In this case, we do not deprecate any version levels since
+              //    brokerDefaultVersionRange.min() equals existingVersionRange.min().
+              (featureName, new FinalizedVersionRange(brokerDefaultVersionRange.min(), existingVersionRange.max()))

Review comment:
       When we roll the cluster to bump up IBP, it seems that it's possible for the min of finalized version to flip repeatedly? This can be a bit weird.
   
   Also, it seems that we should set min version based on the largest min version across all brokers?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -82,18 +110,54 @@ object FinalizedFeatureCache extends Logging {
         " The existing cache contents are %s").format(latest, oldFeatureAndEpoch)
       throw new FeatureCacheUpdateException(errorMsg)
     } else {
-      val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features)
+      val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features)
       if (!incompatibleFeatures.empty) {
         val errorMsg = ("FinalizedFeatureCache update failed since feature compatibility" +
           " checks failed! Supported %s has incompatibilities with the latest %s."
-          ).format(SupportedFeatures.get, latest)
+          ).format(brokerFeatures.supportedFeatures, latest)

Review comment:
       If the broker discovers that it's incompatible, should it just shut itself down?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises
+   * support for. Each broker advertises the version ranges of its own supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a
+   * specified range of version levels. Also, the controller is the only entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater than or equal to
+   * KAFKA_2_7_IV0.

Review comment:
       When we roll the cluster to bump up IBP, it seems that it's possible for status to be enabled and then disabled repeatedly? This can be a bit weird.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -977,14 +1179,30 @@ class KafkaController(val config: KafkaConfig,
 
   /**
    * Send the leader information for selected partitions to selected brokers so that they can correctly respond to
-   * metadata requests
+   * metadata requests. Particularly, when feature versioning is enabled, we filter out brokers with incompatible
+   * features from receiving the metadata requests. This is because we do not want to activate incompatible brokers,
+   * as these may have harmful consequences to the cluster.

Review comment:
       Hmm, do we need to do this? If there is an incompatible feature, the broker will realize that and can just shut itself down.

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+",
+      "about": "Results for each feature update.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,

Review comment:
       The KIP wiki doesn't include this field.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no feature
+   * incompatibilities seen with all known brokers for the provided feature update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update")
+    }
+
+    val incompatibilityError = "Could not apply finalized feature update because" +
+      " brokers were found to have incompatible versions for the feature."
+
+    if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST, incompatibilityError))
+    } else {
+      val defaultMinVersionLevel = brokerFeatures.defaultMinVersionLevel(update.feature)
+      val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, update.maxVersionLevel)

Review comment:
       If update.maxVersionLevel < defaultMinVersionLevel, we throw an IllegalStateException. Should we catch it and convert it to an error code?

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map enables feature

Review comment:
       Could you explain how the default min version is different from the min in supportedFeatures?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no feature
+   * incompatibilities seen with all known brokers for the provided feature update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update")
+    }
+
+    val incompatibilityError = "Could not apply finalized feature update because" +
+      " brokers were found to have incompatible versions for the feature."
+
+    if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {

Review comment:
       Since we are doing the compatibility check for every broker, do we need to special case here just for the broker feature on the controller?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is particularly useful
+ * to hold the result returned by the {@link Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Features<FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Integer> finalizedFeaturesEpoch;
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(final Features<FinalizedVersionRange> finalizedFeatures,
+                           final int finalizedFeaturesEpoch,
+                           final Features<SupportedVersionRange> supportedFeatures) {
+        Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures can not be null.");
+        Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures can not be null.");
+        this.finalizedFeatures = finalizedFeatures;
+        if (finalizedFeaturesEpoch >= 0) {
+            this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+        } else {
+            this.finalizedFeaturesEpoch = Optional.empty();
+        }
+        this.supportedFeatures = supportedFeatures;
+    }
+
+    /**
+     * A map of finalized feature versions, with key being finalized feature name and value
+     * containing the min/max version levels for the finalized feature.
+     */
+    public Features<FinalizedVersionRange> finalizedFeatures() {

Review comment:
       The return type is different from the KIP. Which one is correct? Since this is a public interface, in general, we don't want to expose anything other than truly necessary. This PR seems to expose a lot more public methods to the user.
   
   FinalizedVersionRange is in org.apache.kafka.common.feature. Currently, all public interfaces are specified under javadoc in build.gradle. So, we need to either include that package in javadoc or move it to a public package.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is particularly useful
+ * to hold the result returned by the {@link Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Features<FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Integer> finalizedFeaturesEpoch;
+
+    private final Features<SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(final Features<FinalizedVersionRange> finalizedFeatures,
+                           final int finalizedFeaturesEpoch,
+                           final Features<SupportedVersionRange> supportedFeatures) {
+        Objects.requireNonNull(finalizedFeatures, "Provided finalizedFeatures can not be null.");
+        Objects.requireNonNull(supportedFeatures, "Provided supportedFeatures can not be null.");
+        this.finalizedFeatures = finalizedFeatures;
+        if (finalizedFeaturesEpoch >= 0) {
+            this.finalizedFeaturesEpoch = Optional.of(finalizedFeaturesEpoch);
+        } else {
+            this.finalizedFeaturesEpoch = Optional.empty();
+        }
+        this.supportedFeatures = supportedFeatures;
+    }
+
+    /**
+     * A map of finalized feature versions, with key being finalized feature name and value
+     * containing the min/max version levels for the finalized feature.
+     */
+    public Features<FinalizedVersionRange> finalizedFeatures() {
+        return finalizedFeatures;
+    }
+
+    /**
+     * The epoch for the finalized features.
+     * If the returned value is empty, it means the finalized features are absent/unavailable.
+     */
+    public Optional<Integer> finalizedFeaturesEpoch() {

Review comment:
       The return type is different from the KIP. Which one is correct?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is particularly useful
+ * to hold the result returned by the {@link Admin#describeFeatures(DescribeFeaturesOptions)} API.

Review comment:
       The KIP also exposes host() and port(). Are they still needed?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any
+     * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    /**
+     * Applies specified updates to finalized features. This operation is not transactional so it
+     * may succeed for some features while fail for others.
+     * <p>
+     * The API takes in a map of finalized feature names to {@link FeatureUpdate} that needs to be
+     * applied. Each entry in the map specifies the finalized feature to be added or updated or
+     * deleted, along with the new max feature version level value. This request is issued only to
+     * the controller since the API is only served by the controller. The return value contains an
+     * error code for each supplied {@link FeatureUpdate}, and the code indicates if the update
+     * succeeded or failed in the controller.
+     * <ul>
+     * <li>Downgrade of feature version level is not a regular operation/intent. It is only allowed
+     * in the controller if the {@link FeatureUpdate} has the allowDowngrade flag set - setting this
+     * flag conveys user intent to attempt downgrade of a feature max version level. Note that
+     * despite the allowDowngrade flag being set, certain downgrades may be rejected by the
+     * controller if it is deemed impossible.</li>
+     * <li>Deletion of a finalized feature version is not a regular operation/intent. It could be
+     * done by setting the allowDowngrade flag to true in the {@link FeatureUpdate}, and, setting
+     * the max version level to be less than 1.</li>
+     * </ul>
+     *<p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures
+     * obtained from the returned {@link UpdateFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have alter access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+     *   If the request details are invalid. e.g., a non-existing finalized feature is attempted
+     *   to be deleted or downgraded.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the updates could finish. It cannot be guaranteed whether
+     *   the updates succeeded or not.</li>
+     *   <li>{@link FeatureUpdateFailedException}
+     *   If the updates could not be applied on the controller, despite the request being valid.
+     *   This may be a temporary problem.</li>
+     * </ul>
+     * <p>
+     * This operation is supported by brokers with version 2.7.0 or higher.
+
+     * @param featureUpdates   the map of finalized feature name to {@link FeatureUpdate}
+     * @param options          the options to use
+     *
+     * @return                 the {@link UpdateFeaturesResult} containing the result
+     */
+    UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

Review comment:
       Again, this method has a different signature from the KIP.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1216,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the request is issued to any
+     * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
       The KIP doesn't have DescribeFeaturesOptions. If we are changing the KIP, could we summarize the list of the things that are changed?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) {
         return new AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider();
+
+        Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else if (options.sendRequestToController() && apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    future.completeExceptionally(
+                        Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(
+        final Map<String, FeatureUpdate> featureUpdates, final UpdateFeaturesOptions options) {
+        if (featureUpdates == null || featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be null or empty.");
+        }
+        Objects.requireNonNull(options, "UpdateFeaturesOptions can not be null");
+
+        final UpdateFeaturesRequestData request = UpdateFeaturesRequest.create(featureUpdates);
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>();
+        for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
+            updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+        }
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                return new UpdateFeaturesRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                // Check for controller change.
+                for (UpdatableFeatureResult result : response.data().results()) {
+                    final Errors error = Errors.forCode(result.errorCode());
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                        throw error.exception();

Review comment:
       handleNotControllerError() already throws an exception.
   
   Should other errors like CLUSTER_AUTHORIZATION_FAILED be treated in the same way?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesResult.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#updateFeatures(Map, UpdateFeaturesOptions)} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+public class UpdateFeaturesResult {
+    private final Map<String, KafkaFuture<Void>> futures;
+
+    /**
+     * @param futures   a map from feature names to future, which can be used to check the status of
+     *                  individual feature updates.
+     */
+    public UpdateFeaturesResult(final Map<String, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<String, KafkaFuture<Void>> values() {

Review comment:
       The KIP doesn't have this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org