You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 17:47:48 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r437145250



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -113,14 +128,45 @@ public static ApiVersionsResponse fromStruct(Struct struct, short version) {
         }
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) {
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures) {
+        return apiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, Features.emptyFinalizedFeatures(), UNKNOWN_FINALIZED_FEATURES_EPOCH);
+    }
+
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        Features<FinalizedVersionRange> finalizedFeatures,
+        int finalizedFeaturesEpoch) {
         if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) {
             return DEFAULT_API_VERSIONS_RESPONSE;
         }
-        return createApiVersionsResponse(throttleTimeMs, maxMagic);
+        return createApiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, finalizedFeatures, finalizedFeaturesEpoch);
     }
 
-    public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs, final byte minMagic) {
+    public static ApiVersionsResponse createApiVersionsResponseWithEmptyFeatures(

Review comment:
       I overlooked this case, let's maintain this static constructor without renaming it, since it is public.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being <VersionRangeType>.
+ * Also provides API to convert the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ * @see SupportedVersionRange
+ * @see FinalizedVersionRange
+ */
+public class Features<VersionRangeType extends BaseVersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to a type of VersionRange.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        Objects.requireNonNull(features,"Provided features can not be null.");
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to SupportedVersionRange.
+     *
+     * @return           Returns a new Features object representing supported features.
+     */
+    public static Features<SupportedVersionRange> supportedFeatures(Map<String, SupportedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to FinalizedVersionRange.
+     *
+     * @return           Returns a new Features object representing finalized features.
+     */
+    public static Features<FinalizedVersionRange> finalizedFeatures(Map<String, FinalizedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    // Visible for testing.
+    public static Features<FinalizedVersionRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<SupportedVersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public Map<String, VersionRangeType> features() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    /**
+     * @param  feature   name of the feature
+     *
+     * @return           the VersionRangeType corresponding to the feature name, or null if the
+     *                   feature is absent
+     */
+    public VersionRangeType get(String feature) {
+        return features.get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   A map representation of the underlying features. The returned value can be converted
+     *           back to Features using one of the from*FeaturesMap() APIs of this class.
+     */
+    public Map<String, Map<String, Short>> toMap() {
+        return features.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().toMap()));
+    }
+
+    /**
+     * An interface that defines behavior to convert from a Map to an object of type BaseVersionRange.
+     */
+    private interface MapToBaseVersionRangeConverter<V extends BaseVersionRange> {
+
+        /**
+         * Convert the map representation of an object of type <V>, to an object of type <V>.
+         *
+         * @param  baseVersionRangeMap   the map representation of a BaseVersionRange object.
+         *
+         * @return                       the object of type <V>
+         */
+        V fromMap(Map<String, Short> baseVersionRangeMap);
+    }
+
+    private static <V extends BaseVersionRange> Features<V> fromFeaturesMap(
+        Map<String, Map<String, Short>> featuresMap, MapToBaseVersionRangeConverter<V> converter) {
+        return new Features<>(featuresMap.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> converter.fromMap(entry.getValue()))));
+    }
+
+    /**
+     * Converts from a map to Features<FinalizedVersionRange>.
+     *
+     * @param featuresMap  the map representation of a Features<FinalizedVersionRange> object,
+     *                     generated using the toMap() API.
+     *
+     * @return             the Features<FinalizedVersionRange> object
+     */
+    public static Features<FinalizedVersionRange> fromFinalizedFeaturesMap(
+        Map<String, Map<String, Short>> featuresMap) {
+        return fromFeaturesMap(featuresMap, FinalizedVersionRange::fromMap);
+    }
+
+    /**
+     * Converts from a map to Features<SupportedVersionRange>.
+     *
+     * @param featuresMap  the map representation of a Features<SupportedVersionRange> object,
+     *                     generated using the toMap() API.
+     *
+     * @return             the Features<SupportedVersionRange> object
+     */
+    public static Features<SupportedVersionRange> fromSupportedFeaturesMap(
+        Map<String, Map<String, Short>> featuresMap) {
+        return fromFeaturesMap(featuresMap, SupportedVersionRange::fromMap);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null || !(other instanceof Features)) {

Review comment:
       we don't need to check `other == null` here, the next condition check covers it.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler}
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+import scala.util.control.Exception.ignoring
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked exactly
+     * once successfully. A subsequent invocation will raise an exception.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided in the constructor, and
+     *           this method is called again after a successful previous invocation.
+     * @throws   FeatureCacheUpdateException, if there was an error in updating the
+     *           FinalizedFeatureCache.
+     * @throws   RuntimeException, if there was a failure in reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      debug(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown version is returned only when the
+      //                                           ZK node is absent. Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        featureZNode.status match {
+          case FeatureZNodeStatus.Disabled => {
+            info(s"Feature ZK node at path: $featureZkNodePath is in disabled status.")
+            FinalizedFeatureCache.clear()
+          }
+          case FeatureZNodeStatus.Enabled => {
+            FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)

Review comment:
       Being a bit paranoid here, would it be possible to have out-of-order updates from ZK, such that the version number is not monotonically increasing? I'm thinking even we could throw in FinalizedFeatureCache, do we really want to kill the broker, or we should just log a warning and proceed.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max, each of type short.
+ * The min and max attributes need to satisfy 2 rules:
+ *  - they are each expected to be >= 1, as we only consider positive version values to be valid.
+ *  - max should be >= min.
+ *
+ * The class also provides API to convert the version range to a map.
+ * The class allows for configurable labels for the min/max attributes, which can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {
+    // Non-empty label for the min version key, that's used only to convert to/from a map.
+    private final String minKeyLabel;
+
+    // The value of the minimum version.
+    private final short minValue;
+
+    // Non-empty label for the max version key, that's used only to convert to/from a map.
+    private final String maxKeyLabel;
+
+    // The value of the maximum version.
+    private final short maxValue;
+
+    /**
+     * Raises an exception unless the following condition is met:
+     * minValue >= 1 and maxValue >= 1 and maxValue >= minValue.
+     *
+     * @param minKeyLabel   Label for the min version key, that's used only to convert to/from a map.
+     * @param minValue      The minimum version value.
+     * @param maxKeyLabel   Label for the max version key, that's used only to convert to/from a map.
+     * @param maxValue      The maximum version value.
+     *
+     * @throws IllegalArgumentException   If any of the following conditions are true:
+     *                                     - (minValue < 1) OR (maxValue < 1) OR (maxValue < minValue).
+     *                                     - minKeyLabel is empty, OR, minKeyLabel is empty.
+     */
+    protected BaseVersionRange(String minKeyLabel, short minValue, String maxKeyLabel, short maxValue) {
+        if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Expected minValue >= 1, maxValue >= 1 and maxValue >= minValue, but received" +
+                    " minValue: %d, maxValue: %d", minValue, maxValue));
+        }
+        if (minKeyLabel.isEmpty()) {
+            throw new IllegalArgumentException("Expected minKeyLabel to be non-empty.");
+        }
+        if (maxKeyLabel.isEmpty()) {
+            throw new IllegalArgumentException("Expected maxKeyLabel to be non-empty.");
+        }
+        this.minKeyLabel = minKeyLabel;
+        this.minValue = minValue;
+        this.maxKeyLabel = maxKeyLabel;
+        this.maxValue = maxValue;
+    }
+
+    public short min() {
+        return minValue;
+    }
+
+    public short max() {
+        return maxValue;
+    }
+
+    public String toString() {
+        return String.format("%s[%s:%d, %s:%d]", this.getClass().getSimpleName(), this.minKeyLabel, min(), this.maxKeyLabel, max());
+    }
+
+    public Map<String, Short> toMap() {
+        return new HashMap<String, Short>() {

Review comment:
       nit: we could use Utils.mkMap here

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +781,165 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes)
 }
 
+/**
+ * Represents the status of the FeatureZNode.
+ *
+ * Enabled  -> This status means the feature versioning system (KIP-584) is enabled, and, the
+ *             finalized features stored in the FeatureZNode are active. This status is written by
+ *             the controller to the FeatureZNode only when the broker IBP config is greater than
+ *             or equal to KAFKA_2_7_IV0.
+ *
+ * Disabled -> This status means the feature versioning system (KIP-584) is disabled, and, the
+ *             the finalized features stored in the FeatureZNode is not relevant. This status is
+ *             written by the controller to the FeatureZNode only when the broker IBP config
+ *             is less than KAFKA_2_7_IV0.
+ *
+ * The purpose behind the FeatureZNodeStatus is that it helps differentiates between the following
+ * cases:
+ *
+ * 1. New cluster bootstrap:
+ *    For a new Kafka cluster (i.e. it is deployed first time), we would like to start the cluster
+ *    with all the possible supported features finalized immediately. The new cluster will almost
+ *    never be started with an old IBP config that’s less than KAFKA_2_7_IV0. In such a case, the
+ *    controller will start up and notice that the FeatureZNode is absent in the new cluster.
+ *    To handle the requirement, the controller will create a FeatureZNode (with enabled status)
+ *    containing the entire list of supported features as its finalized features.
+ *
+ * 2. Cluster upgrade:
+ *    Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, but
+ *    the Broker binary has been upgraded to a state where it supports the feature versioning
+ *    system (KIP-584). This means the user is upgrading from an earlier version of the Broker
+ *    binary. In this case, we want to start with no finalized features and allow the user to enable
+ *    them whenever they are ready i.e. in the future whenever the user sets IBP config
+ *    to be greater than or equal to KAFKA_2_7_IV0. The reason is that enabling all the possible
+ *    features immediately after an upgrade could be harmful to the cluster.
+ *    In such a case:
+ *      - Before the Broker upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the controller
+ *        will start up and check if the FeatureZNode is absent. If true, then it will react by
+ *        creating a FeatureZNode with disabled status and empty features.
+ *      - After the Broker upgrade (i.e. IBP config set to greater than or equal to KAFKA_2_7_IV0),
+ *        when the controller starts up it will check if the FeatureZNode exists and whether it is
+ *        disabled. In such a case, it won’t upgrade all features immediately. Instead it will just
+ *        switch the FeatureZNode status to enabled status. This lets the user finalize the features
+ *        later.
+ *
+ * 3. Cluster downgrade:
+ *    Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to
+ *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a
+ *    value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning
+ *    system (KIP-584). In this case, when the controller starts up with the lower IBP config, it
+ *    will switch the FeatureZNode status to disabled with empty features.
+ */
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+/**
+ * Represents the contents of the ZK node containing finalized feature information.
+ *
+ * @param status     the status of the ZK node
+ * @param features   the cluster-wide finalized features
+ */
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {
+  private val VersionKey = "version"
+  private val StatusKey = "status"
+  private val FeaturesKey = "features"
+
+  // V1 contains 'version', 'status' and 'features' keys.
+  val V1 = 1
+  val CurrentVersion = V1
+
+  def path = "/feature"
+
+  def asJavaMap(scalaMap: Map[String, Map[String, Short]]): util.Map[String, util.Map[String, java.lang.Short]] = {
+    scalaMap
+      .view.mapValues(_.view.mapValues(scalaShort => java.lang.Short.valueOf(scalaShort)).toMap.asJava)
+      .toMap
+      .asJava
+  }
+
+  /**
+   * Encodes a FeatureZNode to JSON.
+   *
+   * @param featureZNode   FeatureZNode to be encoded
+   *
+   * @return               JSON representation of the FeatureZNode, as an Array[Byte]
+   */
+  def encode(featureZNode: FeatureZNode): Array[Byte] = {
+    val jsonMap = collection.mutable.Map(
+      VersionKey -> CurrentVersion,
+      StatusKey -> featureZNode.status.id,
+      FeaturesKey -> featureZNode.features.toMap)
+    Json.encodeAsBytes(jsonMap.asJava)
+  }
+
+  /**
+   * Decodes the contents of the feature ZK node from Array[Byte] to a FeatureZNode.
+   *
+   * @param jsonBytes   the contents of the feature ZK node
+   *
+   * @return            the FeatureZNode created from jsonBytes
+   *
+   * @throws IllegalArgumentException   if the Array[Byte] can not be decoded.
+   */
+  def decode(jsonBytes: Array[Byte]): FeatureZNode = {
+    Json.tryParseBytes(jsonBytes) match {
+      case Right(js) =>
+        val featureInfo = js.asJsonObject
+        val version = featureInfo(VersionKey).to[Int]
+        if (version < V1) {
+          throw new IllegalArgumentException(s"Unsupported version: $version of feature information: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+
+        val featuresMap = featureInfo
+          .get(FeaturesKey)
+          .flatMap(_.to[Option[Map[String, Map[String, Int]]]])
+
+        if (featuresMap.isEmpty) {
+          throw new IllegalArgumentException("Features map can not be absent in: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+        val features = asJavaMap(
+          featuresMap
+            .map(theMap => theMap.view.mapValues(_.view.mapValues(_.asInstanceOf[Short]).toMap).toMap)
+            .getOrElse(Map[String, Map[String, Short]]()))
+
+        val statusInt = featureInfo
+          .get(StatusKey)
+          .flatMap(_.to[Option[Int]])
+        if (statusInt.isEmpty) {
+          throw new IllegalArgumentException("Status can not be absent in feature information: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+        val status = FeatureZNodeStatus.withNameOpt(statusInt.get)
+        if (status.isEmpty) {
+          throw new IllegalArgumentException(
+            s"Malformed status: $statusInt  found in feature information: ${new String(jsonBytes, UTF_8)}")

Review comment:
       nit: space

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache with features.
+class FeatureCacheUpdateException(message: String) extends RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[FinalizedVersionRange], epoch: Int) {
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A common mutable cache containing the latest finalized features and epoch. By default the contents of
+ * the cache are empty. This cache needs to be populated at least once for it's contents to become

Review comment:
       it's -> its

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -113,14 +128,45 @@ public static ApiVersionsResponse fromStruct(Struct struct, short version) {
         }
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) {
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures) {
+        return apiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, Features.emptyFinalizedFeatures(), UNKNOWN_FINALIZED_FEATURES_EPOCH);
+    }
+
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        Features<FinalizedVersionRange> finalizedFeatures,
+        int finalizedFeaturesEpoch) {
         if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) {
             return DEFAULT_API_VERSIONS_RESPONSE;
         }
-        return createApiVersionsResponse(throttleTimeMs, maxMagic);
+        return createApiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, finalizedFeatures, finalizedFeaturesEpoch);
     }
 
-    public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs, final byte minMagic) {
+    public static ApiVersionsResponse createApiVersionsResponseWithEmptyFeatures(
+        int throttleTimeMs,
+        final byte minMagic) {
+        return createApiVersionsResponse(
+            throttleTimeMs,
+            minMagic,
+            Features.emptySupportedFeatures(),
+            Features.emptyFinalizedFeatures(),
+            UNKNOWN_FINALIZED_FEATURES_EPOCH);
+    }
+
+    public static ApiVersionsResponse createApiVersionsResponse(
+        int throttleTimeMs,

Review comment:
       nit: should all the parameters be final here, not just minMagic?

##########
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##########
@@ -149,6 +153,53 @@ class BrokerEndPointTest {
     assertEquals(None, broker.rack)
   }
 
+  @Test
+  def testFromJsonV4WithNoFeatures(): Unit = {

Review comment:
       V4 doesn't have feature right? What's the purpose of this test?

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -98,7 +98,9 @@ object ApiVersion {
     // No new APIs, equivalent to 2.4-IV1
     KAFKA_2_5_IV0,
     // Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570)
-    KAFKA_2_6_IV0
+    KAFKA_2_6_IV0,
+    // Introduce feature versioning support (KIP-584)

Review comment:
       nit: use `Introduced` to align with previous comment?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler}
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+import scala.util.control.Exception.ignoring
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked exactly
+     * once successfully. A subsequent invocation will raise an exception.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided in the constructor, and
+     *           this method is called again after a successful previous invocation.
+     * @throws   FeatureCacheUpdateException, if there was an error in updating the
+     *           FinalizedFeatureCache.
+     * @throws   RuntimeException, if there was a failure in reading/deserializing the

Review comment:
       Sorry it's been a while since my last review, but have we discussed the recovery path when we hit a data corruption exception for the cluster? Is there a way to turn off the feature versioning completely to unblock, or we have a mechanism to wipe out ZK data?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache with features.
+class FeatureCacheUpdateException(message: String) extends RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[FinalizedVersionRange], epoch: Int) {
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A common mutable cache containing the latest finalized features and epoch. By default the contents of
+ * the cache are empty. This cache needs to be populated at least once for it's contents to become
+ * non-empty. Currently the main reader of this cache is the read path that serves an ApiVersionsRequest,
+ * returning the features information in the response.
+ *
+ * @see FinalizedFeatureChangeListener
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means

Review comment:
       could be simplified as `the latest known FinalizedFeaturesAndEpoch or empty if not defined in the cache`

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {

Review comment:
       I think I'm no longer insisting on this point, as we could make this as a follow-up work. Filed JIRA here: https://issues.apache.org/jira/browse/KAFKA-10130

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler}
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+import scala.util.control.Exception.ignoring
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable

Review comment:
       I think we could remove `If the cache update is not successful, then, a suitable exception is raised...` which is pretty obvious.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler}
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+import scala.util.control.Exception.ignoring
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked exactly
+     * once successfully. A subsequent invocation will raise an exception.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided in the constructor, and
+     *           this method is called again after a successful previous invocation.
+     * @throws   FeatureCacheUpdateException, if there was an error in updating the
+     *           FinalizedFeatureCache.
+     * @throws   RuntimeException, if there was a failure in reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      debug(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown version is returned only when the
+      //                                           ZK node is absent. Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        featureZNode.status match {
+          case FeatureZNodeStatus.Disabled => {
+            info(s"Feature ZK node at path: $featureZkNodePath is in disabled status.")
+            FinalizedFeatureCache.clear()
+          }
+          case FeatureZNodeStatus.Enabled => {
+            FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+          }
+          case _ => throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully. This method returns
+     * immediately if an updateLatestOrThrow call had already completed successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             TimeoutException if the wait can not be completed in waitTimeMs
+     *                     milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (!notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        ignoring(classOf[InterruptedException]) {
+          queue.take.updateLatestOrThrow()
+        }
+      } catch {
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker will eventually exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: ZK corruption or operational error).
+      // In such a case, we prefer to just log a warning and treat the case as if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      queue.add(new FeatureCacheUpdater(path))
+    }
+  }
+
+  object ZkStateChangeHandler extends StateChangeHandler {
+    val path: String = FeatureZNode.path
+
+    override val name: String = path
+
+    override def afterInitializingSession(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the feature ZK node
+   * (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to the caller, and the Broker
+   * will exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature cache to be updated once.
+   *                                   If this parameter <= 0, no wait operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    if (waitOnceForCacheUpdateMs <= 0) {
+      throw new IllegalArgumentException(
+        s"Expected waitOnceForCacheUpdateMs > 0, but provided: $waitOnceForCacheUpdateMs")
+    }
+
+    thread.start()
+    zkClient.registerStateChangeHandler(ZkStateChangeHandler)
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
+    val ensureCacheUpdateOnce = new FeatureCacheUpdater(
+      FeatureZNodeChangeHandler.path, Some(new CountDownLatch(1)))
+    queue.add(ensureCacheUpdateOnce)
+    try {
+      ensureCacheUpdateOnce.awaitUpdateOrThrow(waitOnceForCacheUpdateMs)
+    } catch {
+      case e: Exception => {

Review comment:
       nit: {} not necessary

##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -755,7 +755,7 @@ class KafkaConfigTest {
         case KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp =>
         case KafkaConfig.SaslLoginRefreshBufferSecondsProp =>
 
-        // Security config
+        //Security config

Review comment:
       add the space back

##########
File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
##########
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server

Review comment:
       nit: we could add a minor test to verify a negative `waitOnceForCacheUpdateMs` will throw

##########
File path: clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Short>> emptyMap = new HashMap<>();
+
+        Features<FinalizedVersionRange> emptyFinalizedFeatures = Features.emptyFinalizedFeatures();
+        assertTrue(emptyFinalizedFeatures.features().isEmpty());
+        assertTrue(emptyFinalizedFeatures.toMap().isEmpty());
+        assertEquals(emptyFinalizedFeatures, Features.fromFinalizedFeaturesMap(emptyMap));
+
+        Features<SupportedVersionRange> emptySupportedFeatures = Features.emptySupportedFeatures();
+        assertTrue(emptySupportedFeatures.features().isEmpty());
+        assertTrue(emptySupportedFeatures.toMap().isEmpty());
+        assertEquals(emptySupportedFeatures, Features.fromSupportedFeaturesMap(emptyMap));
+    }
+
+    @Test
+    public void testNullFeatures() {
+        assertThrows(
+            NullPointerException.class,
+            () -> Features.finalizedFeatures(null));
+        assertThrows(
+            NullPointerException.class,
+            () -> Features.supportedFeatures(null));
+    }
+
+    @Test
+    public void testGetAllFeaturesAPI() {
+        SupportedVersionRange v1 = new SupportedVersionRange((short) 1, (short) 2);
+        SupportedVersionRange v2 = new SupportedVersionRange((short) 3, (short) 4);
+        Map<String, SupportedVersionRange> allFeatures =
+            mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+        Features<SupportedVersionRange> features = Features.supportedFeatures(allFeatures);
+        assertEquals(allFeatures, features.features());
+    }
+
+    @Test
+    public void testGetAPI() {
+        SupportedVersionRange v1 = new SupportedVersionRange((short) 1, (short) 2);
+        SupportedVersionRange v2 = new SupportedVersionRange((short) 3, (short) 4);
+        Map<String, SupportedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+        Features<SupportedVersionRange> features = Features.supportedFeatures(allFeatures);
+        assertEquals(v1, features.get("feature_1"));
+        assertEquals(v2, features.get("feature_2"));
+        assertNull(features.get("nonexistent_feature"));
+    }
+
+    @Test
+    public void testFromFeaturesMapToFeaturesMap() {
+        SupportedVersionRange v1 = new SupportedVersionRange((short) 1, (short) 2);
+        SupportedVersionRange v2 = new SupportedVersionRange((short) 3, (short) 4);
+        Map<String, SupportedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+
+        Features<SupportedVersionRange> features = Features.supportedFeatures(allFeatures);
+
+        Map<String, Map<String, Short>> expected = mkMap(
+            mkEntry("feature_1", mkMap(mkEntry("min_version", (short) 1), mkEntry("max_version", (short) 2))),
+            mkEntry("feature_2", mkMap(mkEntry("min_version", (short) 3), mkEntry("max_version", (short) 4))));
+        assertEquals(expected, features.toMap());
+        assertEquals(features, Features.fromSupportedFeaturesMap(expected));
+    }
+
+    @Test
+    public void testFromToFinalizedFeaturesMap() {
+        FinalizedVersionRange v1 = new FinalizedVersionRange((short) 1, (short) 2);
+        FinalizedVersionRange v2 = new FinalizedVersionRange((short) 3, (short) 4);
+        Map<String, FinalizedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+
+        Features<FinalizedVersionRange> features = Features.finalizedFeatures(allFeatures);
+
+        Map<String, Map<String, Short>> expected = mkMap(
+            mkEntry("feature_1", mkMap(mkEntry("min_version_level", (short) 1), mkEntry("max_version_level", (short) 2))),
+            mkEntry("feature_2", mkMap(mkEntry("min_version_level", (short) 3), mkEntry("max_version_level", (short) 4))));
+        assertEquals(expected, features.toMap());
+        assertEquals(features, Features.fromFinalizedFeaturesMap(expected));
+    }
+
+    @Test
+    public void testToStringFinalizedFeatures() {
+        FinalizedVersionRange v1 = new FinalizedVersionRange((short) 1, (short) 2);
+        FinalizedVersionRange v2 = new FinalizedVersionRange((short) 3, (short) 4);
+        Map<String, FinalizedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+
+        Features<FinalizedVersionRange> features = Features.finalizedFeatures(allFeatures);
+
+        assertEquals(
+            "Features{(feature_1 -> FinalizedVersionRange[min_version_level:1, max_version_level:2]), (feature_2 -> FinalizedVersionRange[min_version_level:3, max_version_level:4])}",
+            features.toString());
+    }
+
+    @Test
+    public void testToStringSupportedFeatures() {
+        SupportedVersionRange v1 = new SupportedVersionRange((short) 1, (short) 2);
+        SupportedVersionRange v2 = new SupportedVersionRange((short) 3, (short) 4);
+        Map<String, SupportedVersionRange> allFeatures
+            = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+
+        Features<SupportedVersionRange> features = Features.supportedFeatures(allFeatures);
+
+        assertEquals(
+            "Features{(feature_1 -> SupportedVersionRange[min_version:1, max_version:2]), (feature_2 -> SupportedVersionRange[min_version:3, max_version:4])}",
+            features.toString());
+    }
+
+    @Test
+    public void testSupportedFeaturesFromMapFailure() {
+        // This is invalid because 'max_version' key is missing.

Review comment:
       nit: instead of using comments, better to build this into the test name, for example:
   `testInvalidSuppportedFeaturesWithMissingMaxVersion`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org