You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/21 04:27:31 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r428370254



##########
File path: clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max of type long.
+ * The min and max attributes are expected to be >= 1, and with max >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {

Review comment:
       Do we want to get a unit test class for `BaseVersionRange`?

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ * @see SupportedVersionRange
+ * @see FinalizedVersionRange
+ */
+public class Features<VersionRangeType extends BaseVersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<SupportedVersionRange> supportedFeatures(Map<String, SupportedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to FinalizedVersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<FinalizedVersionRange> finalizedFeatures(Map<String, FinalizedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    // Visible for testing.
+    public static Features<FinalizedVersionRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<SupportedVersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public Map<String, VersionRangeType> features() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    /**
+     * @param  feature   name of the feature
+     *
+     * @return           the VersionRangeType corresponding to the feature name, or null if absent
+     */
+    public VersionRangeType get(String feature) {
+        return features.get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   A map with underlying features serialized. The returned value can be deserialized
+     *           using one of the deserialize* APIs.

Review comment:
       `deserialize()`? I think the second sentence is redundant.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of {@link BaseVersionRange} representing a range of version levels.
+ * NOTE: This is the backing class used to define the min/max version levels for finalized features.
+ */
+public class FinalizedVersionRange extends BaseVersionRange {
+    // Label for the min version key, that's used only for serialization/deserialization purposes.
+    private static final String MIN_VERSION_LEVEL_KEY_LABEL = "min_version_level";
+
+    // Label for the max version key, that's used only for serialization/deserialization purposes.
+    private static final String MAX_VERSION_LEVEL_KEY_LABEL = "max_version_level";
+
+    public FinalizedVersionRange(long minVersionLevel, long maxVersionLevel) {
+        super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel);
+    }
+
+    public static FinalizedVersionRange deserialize(Map<String, Long> serialized) {
+        return new FinalizedVersionRange(
+            BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, serialized),
+            BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, serialized));
+    }
+
+    private boolean isCompatibleWith(BaseVersionRange versionRange) {

Review comment:
       Should be `SupportedVersionRange` 

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;

Review comment:
       Yea, the reasoning is that we have `get` call blindly look up inside `features` which in this case null is not valid. And I don't feel passing `null` makes sense for the caller, correct?

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of {@link BaseVersionRange} representing a range of version levels.
+ * NOTE: This is the backing class used to define the min/max version levels for finalized features.
+ */
+public class FinalizedVersionRange extends BaseVersionRange {
+    // Label for the min version key, that's used only for serialization/deserialization purposes.
+    private static final String MIN_VERSION_LEVEL_KEY_LABEL = "min_version_level";
+
+    // Label for the max version key, that's used only for serialization/deserialization purposes.
+    private static final String MAX_VERSION_LEVEL_KEY_LABEL = "max_version_level";
+
+    public FinalizedVersionRange(long minVersionLevel, long maxVersionLevel) {
+        super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel);
+    }
+
+    public static FinalizedVersionRange deserialize(Map<String, Long> serialized) {
+        return new FinalizedVersionRange(
+            BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, serialized),
+            BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, serialized));
+    }
+
+    private boolean isCompatibleWith(BaseVersionRange versionRange) {
+        return min() >= versionRange.min() && max() <= versionRange.max();

Review comment:
       Just for the sake of argument, I feel we could remove this method and just test:
   ```
   min() < supportedVersionRange.min() || max() > supportedVersionRange.max()
   ```
   for incompatibility.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait
+     *                     - TimeoutException if the wait can not be completed in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()

Review comment:
       Is it possible to have no enqueued updater, and cause this function block the thread indefinitely? 

##########
File path: clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java
##########
@@ -0,0 +1,135 @@
+package org.apache.kafka.common.feature;

Review comment:
       Missing header

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
##########
@@ -17,13 +17,19 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.feature.Features;

Review comment:
       Seems we didn't trigger style check on this new class.

##########
File path: clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java
##########
@@ -0,0 +1,135 @@
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FinalizedVersionRangeTest {

Review comment:
       What's the difference between this test class and its super class test case? Same question for `SupportedVersionRangeTest`

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache with features.
+class FeatureCacheUpdateException(message: String) extends RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest
+ * returning the features information in the response. In the future, as the feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time when this
+   *           method is invoked. This result could change in the future whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update operation fails
+   *                         due to invalid parameters or incompatibilities with the broker's
+   *                         supported features. In such a case, the existing cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>")
+    if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > latest.epoch) {
+      val errorMsg = ("FinalizedFeatureCache update failed due to invalid epoch in new finalized %s." +
+        " The existing finalized is %s").format(latest, existingStr)
+      throw new FeatureCacheUpdateException(errorMsg)
+    } else {
+      val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features)
+      if (incompatibleFeatures.nonEmpty) {
+        val errorMsg = ("FinalizedFeatureCache updated failed since feature compatibility" +
+          " checks failed! Supported %s has incompatibilities with the latest finalized %s." +
+          " The incompatible features are: %s.").format(
+          SupportedFeatures.get, latest, incompatibleFeatures)
+        throw new FeatureCacheUpdateException(errorMsg)
+      }
+    }
+    val logMsg = "Updated cache from existing finalized %s to latest finalized %s".format(

Review comment:
       Could we move this logic as part of inner else? Like:
   ```
   else {
         val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features)
         if (incompatibleFeatures.nonEmpty) {
           val errorMsg = ("FinalizedFeatureCache updated failed since feature compatibility" +
             " checks failed! Supported %s has incompatibilities with the latest finalized %s." +
             " The incompatible features are: %s.").format(
             SupportedFeatures.get, latest, incompatibleFeatures)
           throw new FeatureCacheUpdateException(errorMsg)
         } else {
           val logMsg = "Updated cache from existing finalized %s to latest finalized %s".format(
             oldFeatureAndEpoch, latest)
           featuresAndEpoch = Some(latest)
           info(logMsg)
         }
       }
   ```
   It makes the if-else logic more tight.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait

Review comment:
       I don't think this is scala accepted comment style to add `-`, do you see a warning?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -812,6 +817,8 @@ object KafkaConfig {
   val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens"
   val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying."
   val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server"
+  /** ********* Feature configuration ***********/
+  val FeatureChangeListenerCacheUpdateWaitTimeMsDoc = "# of milli seconds to wait for feature cache to be updated once."

Review comment:
       `wait time for the first feature cache update upon initialization`

##########
File path: core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
##########
@@ -811,8 +828,16 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(Seq.empty, zkClient.getSortedBrokerList)
     assertEquals(None, zkClient.getBroker(0))
 
-    val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
-    val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
+    val brokerInfo0 = createBrokerInfo(

Review comment:
       If we are not validating the features by extracting them, I think we do not need to pass in a non-empty feature list?

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of {@link BaseVersionRange} representing a range of version levels.
+ * NOTE: This is the backing class used to define the min/max version levels for finalized features.
+ */
+public class FinalizedVersionRange extends BaseVersionRange {
+    // Label for the min version key, that's used only for serialization/deserialization purposes.
+    private static final String MIN_VERSION_LEVEL_KEY_LABEL = "min_version_level";
+
+    // Label for the max version key, that's used only for serialization/deserialization purposes.
+    private static final String MAX_VERSION_LEVEL_KEY_LABEL = "max_version_level";
+
+    public FinalizedVersionRange(long minVersionLevel, long maxVersionLevel) {
+        super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel);
+    }
+
+    public static FinalizedVersionRange deserialize(Map<String, Long> serialized) {
+        return new FinalizedVersionRange(
+            BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, serialized),
+            BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, serialized));
+    }
+
+    private boolean isCompatibleWith(BaseVersionRange versionRange) {
+        return min() >= versionRange.min() && max() <= versionRange.max();
+    }
+
+    /**
+     * Checks if the [min, max] version level range of this object does *NOT* fall within the
+     * [min, max] version range of the provided SupportedVersionRange parameter.
+     *
+     * @param versionRange   the SupportedVersionRange to be checked

Review comment:
       nit: supportedVersionRange

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -146,7 +162,14 @@ object BrokerIdZNode {
     val plaintextEndpoint = broker.endPoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).getOrElse(
       new EndPoint(null, -1, null, null))
     encode(brokerInfo.version, plaintextEndpoint.host, plaintextEndpoint.port, broker.endPoints, brokerInfo.jmxPort,
-      broker.rack)
+      broker.rack, broker.features)
+  }
+
+  def asJavaMap(brokerInfo: JsonObject): util.Map[String, util.Map[String, java.lang.Long]] = {

Review comment:
       s/asJavaMap/featuresAsJavaMap

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,74 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported by the Broker.
+ * Also provides API to check for incompatibilities between the latest features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[SupportedVersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest feature supported by the
+   * Broker, and the provided cluster-wide finalized feature. This can happen because a provided
+   * cluster-wide finalized feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities need to be checked for.
+   *
+   * @return            The set of incompatible feature names. If the returned set is empty, it
+   *                    means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Set[String] = {

Review comment:
       I'm slightly inclined to return a set of features instead of just strings, and make the string conversion as a helper. But I leave this up to you to decide, and we could always adapt the function to make it more useful in other scenarios as needed.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -81,17 +83,26 @@ object BrokerIdsZNode {
 object BrokerInfo {
 
   /**
-   * Create a broker info with v4 json format (which includes multiple endpoints and rack) if
-   * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise.
+   * - Create a broker info with v5 json format if the apiVersion is 2.6.x or above.
+   * - Create a broker info with v4 json format (which includes multiple endpoints and rack) if
+   *   the apiVersion is 0.10.0.X or above but lesser than 2.6.x.
+   * - Register the broker with v2 json format otherwise.
    *
    * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2.
    *
-   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above without having to
-   * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case).
+   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above
+   * without having to upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in
+   * any case).
    */
   def apply(broker: Broker, apiVersion: ApiVersion, jmxPort: Int): BrokerInfo = {
-    // see method documentation for the reason why we do this
-    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+    val version = {
+      if (apiVersion >= KAFKA_0_10_0_IV1)

Review comment:
       aha, the order is wrong for `KAFKA_0_10_0_IV1` and `KAFKA_2_6_IV1`

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {
+  private val VersionKey = "version"
+  private val StatusKey = "status"
+  private val FeaturesKey = "features"
+
+  // Version0 contains 'version', 'status' and 'features' keys.
+  val Version0 = 0
+  val CurrentVersion = Version0
+
+  def path = "/feature"
+
+  def asJavaMap(scalaMap: Map[String, Map[String, Long]]): util.Map[String, util.Map[String, java.lang.Long]] = {
+    scalaMap
+      .view.mapValues(_.view.mapValues(scalaLong => java.lang.Long.valueOf(scalaLong)).toMap.asJava)
+      .toMap
+      .asJava
+  }
+
+  def encode(featureZNode: FeatureZNode): Array[Byte] = {
+    val jsonMap = collection.mutable.Map(
+      VersionKey -> CurrentVersion,
+      StatusKey -> featureZNode.status.id,
+      FeaturesKey -> featureZNode.features.serialize)
+    Json.encodeAsBytes(jsonMap.asJava)
+  }
+
+  def decode(jsonBytes: Array[Byte]): FeatureZNode = {
+    Json.tryParseBytes(jsonBytes) match {
+      case Right(js) =>
+        val featureInfo = js.asJsonObject
+        val version = featureInfo(VersionKey).to[Int]
+        if (version < Version0 || version > CurrentVersion) {
+          throw new KafkaException(s"Unsupported version: $version of feature information: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+
+        val featuresMap = featureInfo
+          .get(FeaturesKey)
+          .flatMap(_.to[Option[Map[String, Map[String, Long]]]])
+        if (featuresMap.isEmpty) {
+          throw new KafkaException("Features map can not be absent in: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+        val features = asJavaMap(featuresMap.get)
+
+        val statusInt = featureInfo
+          .get(StatusKey)
+          .flatMap(_.to[Option[Int]])
+        if (statusInt.isEmpty) {
+          throw new KafkaException("Status can not be absent in feature information: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+        val status = FeatureZNodeStatus.withNameOpt(statusInt.get)
+        if (status.isEmpty) {

Review comment:
       Could we log statusInt here as well? Also I feel the exception should be thrown from  `FeatureZNodeStatus.withNameOpt`

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ * @see SupportedVersionRange
+ * @see FinalizedVersionRange
+ */
+public class Features<VersionRangeType extends BaseVersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<SupportedVersionRange> supportedFeatures(Map<String, SupportedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to FinalizedVersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<FinalizedVersionRange> finalizedFeatures(Map<String, FinalizedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    // Visible for testing.
+    public static Features<FinalizedVersionRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<SupportedVersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public Map<String, VersionRangeType> features() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    /**
+     * @param  feature   name of the feature
+     *
+     * @return           the VersionRangeType corresponding to the feature name, or null if absent
+     */
+    public VersionRangeType get(String feature) {
+        return features.get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   A map with underlying features serialized. The returned value can be deserialized
+     *           using one of the deserialize* APIs.
+     */
+    public Map<String, Map<String, Long>> serialize() {
+        return features.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().serialize()));
+    }
+
+    /**
+     * Deserialize a map to Features<FinalizedVersionRange>.
+     *
+     * @param serialized   the serialized representation of a Features<FinalizedVersionRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<FinalizedVersionRange> object
+     */
+    public static Features<FinalizedVersionRange> deserializeFinalizedFeatures(
+        Map<String, Map<String, Long>> serialized) {
+        return finalizedFeatures(serialized.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> FinalizedVersionRange.deserialize(entry.getValue()))));
+    }
+
+    /**
+     * Deserializes a map to Features<SupportedVersionRange>.
+     *
+     * @param serialized   the serialized representation of a Features<SupportedVersionRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<SupportedVersionRange> object
+     */
+    public static Features<SupportedVersionRange> deserializeSupportedFeatures(

Review comment:
       Maybe I'm a bit too obsessive about code duplication, but after I made an attempt I thought we could actually have the internal deserialization logic shared between `deserializeFinalizedFeatures` and `deserializeSupportedFeatures` by making a template
   ```
    public static Features<FinalizedVersionRange> deserializeFinalizedFeatures(Map<String, Map<String, Long>> serialized) {
           return deserializeFeatures(serialized, FinalizedVersionRange::deserialize);
       }
   
       public static Features<SupportedVersionRange> deserializeSupportedFeatures(
           Map<String, Map<String, Long>> serialized) {
           return deserializeFeatures(serialized, SupportedVersionRange::deserialize);
       }
           
       
       private interface Deserializer<V> {
           V deserialize(Map<String, Long> serialized);
       }
   
   
       private static <V extends BaseVersionRange> Features<V> deserializeFeatures(Map<String, Map<String, Long>> serialized, Deserializer<V> deserializer) {
           return new Features<>(serialized.entrySet().stream().collect(
               Collectors.toMap(
                   Map.Entry::getKey,
                   entry -> deserializer.deserialize(entry.getValue()))));
       }
   ```

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,88 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache with features.
+class FeatureCacheUpdateException(message: String) extends RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[FinalizedVersionRange], epoch: Int) {
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This cache is populated by a
+ * {@link FinalizedFeatureChangeListener}.
+ *
+ * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest

Review comment:
       I think we don't need to talk about future work inside the comment, just making it clear that the read path for serving ApiVersionsRequest is the only reader as of now.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of VersionRange representing a range of versions.
+ * NOTE: This is the backing class used to define the min/max versions for supported features.

Review comment:
       Why this is a `NOTE`? Could we just comment like:
   ```
   An extended BaseVersionRange representing the min/max versions for supported features.
   ```

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification

Review comment:
       nit: provide

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {

Review comment:
       Does the version field existence guarantee there is a valid feature data node or not? In fact, `getDataAndVersion` returns an optional data. I checked the getDataAndVersion caller `ProducerIdManager`, there is a handling for empty data which I feel we should have as well. 
   Additionally, I think since we haven't implemented the write path yet, could we get a ticket to write down a short description on how the write path shall look like, by defining the different cases like:
   ```
   empty dataBytes, valid version 
   valid dataBytes, valid version 
   empty dataBytes, unknown version 
   valid dataBytes, unknown version 
   ```
   if that makes sense, so that we could keep track of the design decisions we made in the read path PR when implementing the write path.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait
+     *                     - TimeoutException if the wait can not be completed in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")

Review comment:
       Do we need the comment to be on info level?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait
+     *                     - TimeoutException if the wait can not be completed in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {

Review comment:
       feel neutral about this helper function

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait
+     *                     - TimeoutException if the wait can not be completed in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: operational error).
+      // In such a case, we prefer to just log a warning and treat the case as if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      processNotification()
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the feature ZK node
+   * (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to the caller, and the Broker
+   * would exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature cache to be updated once.
+   *                                   If this parameter <= 0, no wait operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    thread.start()
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
+
+    if (waitOnceForCacheUpdateMs > 0) {
+      val barrier = new CountDownLatch(1)

Review comment:
       nit: don't feel strong about having this parameter

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait
+     *                     - TimeoutException if the wait can not be completed in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: operational error).
+      // In such a case, we prefer to just log a warning and treat the case as if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      processNotification()
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the feature ZK node
+   * (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to the caller, and the Broker
+   * would exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature cache to be updated once.
+   *                                   If this parameter <= 0, no wait operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    thread.start()
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)

Review comment:
       For an educational question, does the zkClient have a separate thread to do the node change monitoring?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {

Review comment:
       Could we summary the possible thrown error code in the comment as well? For example, does a JSON deserialization error should be treated as fatal?

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {

Review comment:
       I feel we might worth creating a separate thread discussing whether we could get some benefit of the automated protocol generation framework here, as I think this could be easily represented as JSON if we define it in the common package like other RPC data. The difficulty right now is mostly on the serialization and deserialization for feature itself, but these could have workarounds if we want to do so.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait
+     *                     - TimeoutException if the wait can not be completed in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: operational error).
+      // In such a case, we prefer to just log a warning and treat the case as if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      processNotification()
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the feature ZK node
+   * (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to the caller, and the Broker
+   * would exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature cache to be updated once.
+   *                                   If this parameter <= 0, no wait operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    thread.start()
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
+
+    if (waitOnceForCacheUpdateMs > 0) {
+      val barrier = new CountDownLatch(1)
+      val ensureCacheUpdateOnce = new FeatureCacheUpdater(FeatureZNodeChangeHandler.path, Some(barrier))
+      queue.add(ensureCacheUpdateOnce)
+      try {
+        ensureCacheUpdateOnce.awaitUpdateOrThrow(waitOnceForCacheUpdateMs)
+      } catch {
+        case e: Exception => {
+          close()
+          throw e
+        }
+      }
+    }
+  }
+
+  /**
+   * Closes the feature ZK node change listener by unregistering the listener from ZK client,
+   * clearing the queue and shutting down the ChangeNotificationProcessorThread.
+   */
+  def close(): Unit = {
+    zkClient.unregisterZNodeChangeHandler(FeatureZNodeChangeHandler.path)

Review comment:
       Does the order matter here? I was wondering if there is any concurrent issue if we unregister before the queue and thread get cleaned up.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait
+     *                     - TimeoutException if the wait can not be completed in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)

Review comment:
       `Feature cache update gets interrupted`

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -210,6 +215,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         /* setup zookeeper */
         initZkClient(time)
 
+        /* initialize features */
+        _featureChangeListener = new FinalizedFeatureChangeListener(_zkClient)
+        if (config.interBrokerProtocolVersion >= KAFKA_2_6_IV1) {
+          // The feature versioning system (KIP-584) is active only when:

Review comment:
       I think the comment is not necessary, since we have already commented on `KAFKA_2_6_IV1`

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait
+     *                     - TimeoutException if the wait can not be completed in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: operational error).
+      // In such a case, we prefer to just log a warning and treat the case as if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      processNotification()
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the feature ZK node
+   * (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to the caller, and the Broker
+   * would exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature cache to be updated once.
+   *                                   If this parameter <= 0, no wait operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    thread.start()
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
+
+    if (waitOnceForCacheUpdateMs > 0) {
+      val barrier = new CountDownLatch(1)
+      val ensureCacheUpdateOnce = new FeatureCacheUpdater(FeatureZNodeChangeHandler.path, Some(barrier))
+      queue.add(ensureCacheUpdateOnce)
+      try {
+        ensureCacheUpdateOnce.awaitUpdateOrThrow(waitOnceForCacheUpdateMs)
+      } catch {
+        case e: Exception => {
+          close()
+          throw e
+        }
+      }
+    }
+  }
+
+  /**
+   * Closes the feature ZK node change listener by unregistering the listener from ZK client,
+   * clearing the queue and shutting down the ChangeNotificationProcessorThread.
+   */
+  def close(): Unit = {
+    zkClient.unregisterZNodeChangeHandler(FeatureZNodeChangeHandler.path)
+    queue.clear()
+    thread.shutdown()
+    thread.join()
+  }
+
+  // Useful for testing.

Review comment:
       We could just comment `For testing only`

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,74 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported by the Broker.
+ * Also provides API to check for incompatibilities between the latest features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[SupportedVersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest feature supported by the
+   * Broker, and the provided cluster-wide finalized feature. This can happen because a provided
+   * cluster-wide finalized feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities need to be checked for.
+   *
+   * @return            The set of incompatible feature names. If the returned set is empty, it
+   *                    means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Set[String] = {
+    val incompatibilities = finalized.features.asScala.collect {
+      case (feature, versionLevels) => {
+        val supportedVersions = supportedFeatures.get(feature);
+        if (supportedVersions == null) {
+          (feature, "{feature=%s, reason='Unsupported feature'}".format(feature))
+        } else if (versionLevels.isIncompatibleWith(supportedVersions)) {
+          (feature, "{feature=%s, reason='Finalized %s is incompatible with supported %s'}".format(
+            feature, versionLevels, supportedVersions))
+        } else {
+          (feature, null)
+        }
+      }
+    }.filter(entry => entry._2 != null)
+
+    if (incompatibilities.nonEmpty) {

Review comment:
       This logging is duplicate

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,74 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported by the Broker.
+ * Also provides API to check for incompatibilities between the latest features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.

Review comment:
       nit: Returns a reference to the latest features supported by the broker.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,200 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted during wait
+     *                     - TimeoutException if the wait can not be completed in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that are populated into the
+   * queue. If any change notification can not be processed successfully (unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      processNotification()

Review comment:
       I think even if this is an operational error, the cluster is at risk of violating the feature semantics previously enabled, which is different from an unknown feature version from the beginning. I feel we should just exit in fatal error for this case, but would open for discussion.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {
+  private val VersionKey = "version"
+  private val StatusKey = "status"
+  private val FeaturesKey = "features"
+
+  // Version0 contains 'version', 'status' and 'features' keys.
+  val Version0 = 0

Review comment:
       Could we name it V0 for simplicity?

##########
File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala
##########
@@ -1567,6 +1567,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     createRecursive(path, data = null, throwIfPathExists = false)
   }
 
+  // Visible for testing.
+  def createFeatureZNode(nodeContents: FeatureZNode): Unit = {

Review comment:
       If that's the case, I feel we could remove the testing only comment.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {
+  private val VersionKey = "version"
+  private val StatusKey = "status"
+  private val FeaturesKey = "features"
+
+  // Version0 contains 'version', 'status' and 'features' keys.
+  val Version0 = 0
+  val CurrentVersion = Version0
+
+  def path = "/feature"
+
+  def asJavaMap(scalaMap: Map[String, Map[String, Long]]): util.Map[String, util.Map[String, java.lang.Long]] = {
+    scalaMap
+      .view.mapValues(_.view.mapValues(scalaLong => java.lang.Long.valueOf(scalaLong)).toMap.asJava)
+      .toMap
+      .asJava
+  }
+
+  def encode(featureZNode: FeatureZNode): Array[Byte] = {
+    val jsonMap = collection.mutable.Map(
+      VersionKey -> CurrentVersion,
+      StatusKey -> featureZNode.status.id,
+      FeaturesKey -> featureZNode.features.serialize)
+    Json.encodeAsBytes(jsonMap.asJava)
+  }
+
+  def decode(jsonBytes: Array[Byte]): FeatureZNode = {
+    Json.tryParseBytes(jsonBytes) match {
+      case Right(js) =>
+        val featureInfo = js.asJsonObject
+        val version = featureInfo(VersionKey).to[Int]
+        if (version < Version0 || version > CurrentVersion) {
+          throw new KafkaException(s"Unsupported version: $version of feature information: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+
+        val featuresMap = featureInfo
+          .get(FeaturesKey)
+          .flatMap(_.to[Option[Map[String, Map[String, Long]]]])
+        if (featuresMap.isEmpty) {
+          throw new KafkaException("Features map can not be absent in: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+        val features = asJavaMap(featuresMap.get)
+
+        val statusInt = featureInfo
+          .get(StatusKey)
+          .flatMap(_.to[Option[Int]])
+        if (statusInt.isEmpty) {
+          throw new KafkaException("Status can not be absent in feature information: " +

Review comment:
       Is there a more dedicated exception code for deserialization error? I feel the KafkaException is a bit too general compared with IllegalArgument

##########
File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
##########
@@ -0,0 +1,228 @@
+package kafka.server
+
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion, ZooKeeperTestHarness}
+import kafka.utils.{Exit, TestUtils}
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.internals.FatalExitError
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.{Before, Test}
+
+import scala.concurrent.TimeoutException
+import scala.jdk.CollectionConverters._
+
+class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness {

Review comment:
       Could we extract some common initialization logic for the tests to reduce duplication?

##########
File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
##########
@@ -0,0 +1,228 @@
+package kafka.server
+
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion, ZooKeeperTestHarness}
+import kafka.utils.{Exit, TestUtils}
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.internals.FatalExitError
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.{Before, Test}
+
+import scala.concurrent.TimeoutException
+import scala.jdk.CollectionConverters._
+
+class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness {
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    FinalizedFeatureCache.clear()
+    SupportedFeatures.clear()
+  }
+
+  /**
+   * Tests that the listener can be initialized, and that it can listen to ZK notifications
+   * successfully from an "Enabled" FeatureZNode (the ZK data has no feature incompatibilities).
+   */
+  @Test
+  def testInitSuccessAndNotificationSuccess(): Unit = {
+    val supportedFeatures = Map[String, SupportedVersionRange](
+      "feature_1" -> new SupportedVersionRange(1, 4),
+      "feature_2" -> new SupportedVersionRange(1, 3))
+    SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava))
+
+    val initialFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
+      "feature_1" -> new FinalizedVersionRange(2, 3))
+    val initialFinalizedFeatures = Features.finalizedFeatures(initialFinalizedFeaturesMap.asJava)
+    zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, initialFinalizedFeatures))
+    val (mayBeFeatureZNodeBytes, initialVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    assertNotEquals(initialVersion, ZkVersion.UnknownVersion)
+    assertFalse(mayBeFeatureZNodeBytes.isEmpty)
+
+    val listener = new FinalizedFeatureChangeListener(zkClient)
+    assertFalse(listener.isListenerInitiated)
+    assertTrue(FinalizedFeatureCache.empty)
+    listener.initOrThrow(15000)
+    assertTrue(listener.isListenerInitiated)
+    val mayBeNewCacheContent = FinalizedFeatureCache.get
+    assertFalse(mayBeNewCacheContent.isEmpty)
+    val newCacheContent = mayBeNewCacheContent.get
+    assertEquals(initialFinalizedFeatures, newCacheContent.features)
+    assertEquals(initialVersion, newCacheContent.epoch)
+
+    val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
+      "feature_1" -> new FinalizedVersionRange(2, 4))
+    val updatedFinalizedFeatures = Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava)
+    zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, updatedFinalizedFeatures))
+    val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
+    assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
+    assertTrue(updatedVersion > initialVersion)
+    TestUtils.waitUntilTrue(() => {
+      FinalizedFeatureCache.get.get.equals(FinalizedFeaturesAndEpoch(updatedFinalizedFeatures, updatedVersion))
+    }, "Timed out waiting for FinalizedFeatureCache to be updated with new features")
+    assertTrue(listener.isListenerInitiated)
+  }
+
+  /**
+   * Tests that the listener can be initialized, and that it can process FeatureZNode deletion
+   * successfully.
+   */
+  @Test
+  def testFeatureZNodeDeleteNotificationProcessing(): Unit = {
+    val supportedFeatures = Map[String, SupportedVersionRange](
+      "feature_1" -> new SupportedVersionRange(1, 4),
+      "feature_2" -> new SupportedVersionRange(1, 3))
+    SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava))
+
+    val initialFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
+      "feature_1" -> new FinalizedVersionRange(2, 3))
+    val initialFinalizedFeatures = Features.finalizedFeatures(initialFinalizedFeaturesMap.asJava)
+    zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, initialFinalizedFeatures))
+    val (mayBeFeatureZNodeBytes, initialVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    assertNotEquals(initialVersion, ZkVersion.UnknownVersion)
+    assertFalse(mayBeFeatureZNodeBytes.isEmpty)
+
+    val listener = new FinalizedFeatureChangeListener(zkClient)
+    assertFalse(listener.isListenerInitiated)
+    assertTrue(FinalizedFeatureCache.empty)
+    listener.initOrThrow(15000)
+    assertTrue(listener.isListenerInitiated)
+    val mayBeNewCacheContent = FinalizedFeatureCache.get
+    assertFalse(mayBeNewCacheContent.isEmpty)
+    val newCacheContent = mayBeNewCacheContent.get
+    assertEquals(initialFinalizedFeatures, newCacheContent.features)
+    assertEquals(initialVersion, newCacheContent.epoch)
+
+    zkClient.deleteFeatureZNode()

Review comment:
       I'm a bit surprised, do we want to support feature znode deletion in long term?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -776,6 +776,9 @@ class KafkaConfigTest {
         case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
         case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
 
+        //Feature configuration

Review comment:
       nit: space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org