You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/18 20:35:20 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-9755: Implement read path for feature versioning system (KIP-584)

abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r426806321



##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {

Review comment:
       Is this function only used in unit test?

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) {
+        return new Features<VersionRange>(features);

Review comment:
       Could be simplified as new Features<>

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.

Review comment:
       nit: we could use {@link VersionRangeType} to reference to the classes.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);

Review comment:
       nit: just a personal preference, but getting one less internal reference to a public function `all` makes the code usage check easier, like `features.get(feature)`. 

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;

Review comment:
       We should ensure `features` is not null

##########
File path: clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<VersionLevelRange> emptyFinalizedFeatures = Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.all());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<VersionRange> emptySupportedFeatures = Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.all());
+        assertEquals(
+            new HashMap<String, HashMap<String, Long>>(),
+            emptySupportedFeatures.serialize());
+        assertEquals(emptySupportedFeatures, Features.deserializeSupportedFeatures(emptyMap));
+    }
+
+    @Test
+    public void testAllAPI() {

Review comment:
       s/AllAPI/GetAllFeatures

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+

Review comment:
       nit: extra line

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,70 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionRange, VersionLevelRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported by the Broker.
+ * Also provides API to check for incompatibilities between the latest features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {

Review comment:
       nit: add a line

##########
File path: clients/src/test/java/org/apache/kafka/common/feature/VersionLevelRangeTest.java
##########
@@ -0,0 +1,162 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class VersionLevelRangeTest {
+
+    @Test
+    public void testCreateFailDueToInvalidParams() {
+        // min and max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(0, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(-1, -1));
+        // min can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(0, 1));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(-1, 1));
+        // max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(1, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(1, -1));
+        // min can't be > max.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(2, 1));
+    }
+
+    @Test
+    public void testSerializeDeserialize() {
+        VersionLevelRange versionLevelRange = new VersionLevelRange(1, 2);
+        assertEquals(1, versionLevelRange.min());
+        assertEquals(2, versionLevelRange.max());
+
+        Map<String, Long> serialized = versionLevelRange.serialize();
+        assertEquals(
+            new HashMap<String, Long>() {
+                {
+                    put("min_version_level", versionLevelRange.min());
+                    put("max_version_level", versionLevelRange.max());
+                }
+            },
+            serialized
+        );
+
+        VersionLevelRange deserialized = VersionLevelRange.deserialize(serialized);
+        assertEquals(1, deserialized.min());
+        assertEquals(2, deserialized.max());
+        assertEquals(versionLevelRange, deserialized);
+    }
+
+    @Test
+    public void testDeserializationFailureTest() {
+        // min_version_level can't be < 1.
+        Map<String, Long> invalidWithBadMinVersion = new HashMap<String, Long>() {
+            {
+                put("min_version_level", 0L);
+                put("max_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithBadMinVersion));
+
+        // max_version_level can't be < 1.
+        Map<String, Long> invalidWithBadMaxVersion = new HashMap<String, Long>() {
+            {
+                put("min_version_level", 1L);
+                put("max_version_level", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithBadMaxVersion));
+
+        // min_version_level and max_version_level can't be < 1.
+        Map<String, Long> invalidWithBadMinMaxVersion = new HashMap<String, Long>() {
+            {
+                put("min_version_level", 0L);
+                put("max_version_level", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithBadMinMaxVersion));
+
+        // min_version_level can't be > max_version_level.
+        Map<String, Long> invalidWithLowerMaxVersion = new HashMap<String, Long>() {
+            {
+                put("min_version_level", 2L);
+                put("max_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithLowerMaxVersion));
+
+        // min_version_level key missing.
+        Map<String, Long> invalidWithMinKeyMissing = new HashMap<String, Long>() {
+            {
+                put("max_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithMinKeyMissing));
+
+        // max_version_level key missing.
+        Map<String, Long> invalidWithMaxKeyMissing = new HashMap<String, Long>() {
+            {
+                put("min_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithMaxKeyMissing));
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals("VersionLevelRange[1, 1]", new VersionLevelRange(1, 1).toString());
+        assertEquals("VersionLevelRange[1, 2]", new VersionLevelRange(1, 2).toString());
+    }
+
+    @Test
+    public void testEquals() {
+        assertTrue(new VersionLevelRange(1, 1).equals(new VersionLevelRange(1, 1)));
+        assertFalse(new VersionLevelRange(1, 1).equals(new VersionLevelRange(1, 2)));
+    }
+
+    @Test
+    public void testIsCompatibleWith() {
+        assertTrue(new VersionLevelRange(1, 1).isCompatibleWith(new VersionRange(1, 1)));
+        assertTrue(new VersionLevelRange(2, 3).isCompatibleWith(new VersionRange(1, 4)));
+        assertTrue(new VersionLevelRange(1, 4).isCompatibleWith(new VersionRange(1, 4)));
+
+        assertFalse(new VersionLevelRange(1, 4).isCompatibleWith(new VersionRange(2, 3)));
+        assertFalse(new VersionLevelRange(1, 4).isCompatibleWith(new VersionRange(2, 4)));
+        assertFalse(new VersionLevelRange(2, 4).isCompatibleWith(new VersionRange(2, 3)));
+    }
+
+    @Test
+    public void testGetters() {
+        assertEquals(1, new VersionLevelRange(1, 2).min());
+        assertEquals(2, new VersionLevelRange(1, 2).max());
+    }
+}

Review comment:
       nit: new line

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   Serializes the underlying features to a map, and returns the same.
+     *           The returned value can be deserialized using one of the deserialize* APIs.
+     */
+    public Map<String, Map<String, Long>> serialize() {
+        return features.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().serialize()));
+    }
+
+    /**
+     * Deserializes a map to Features<VersionLevelRange>.
+     *
+     * @param serialized   the serialized representation of a Features<VersionLevelRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<VersionLevelRange> object
+     */
+    public static Features<VersionLevelRange> deserializeFinalizedFeatures(
+        Map<String, Map<String, Long>> serialized) {
+        return finalizedFeatures(serialized.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> VersionLevelRange.deserialize(entry.getValue()))));
+    }
+
+    /**
+     * Deserializes a map to Features<VersionRange>.
+     *
+     * @param serialized   the serialized representation of a Features<VersionRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<VersionRange> object
+     */
+    public static Features<VersionRange> deserializeSupportedFeatures(
+        Map<String, Map<String, Long>> serialized) {
+        return supportedFeatures(serialized.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> VersionRange.deserialize(entry.getValue()))));
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {

Review comment:
       We should check `null` for other.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);

Review comment:
       Also if we could potentially have a not-found feature, we should either fail with illegal state, or make the return type `Optional<VersionRangeType>`

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   Serializes the underlying features to a map, and returns the same.
+     *           The returned value can be deserialized using one of the deserialize* APIs.
+     */
+    public Map<String, Map<String, Long>> serialize() {
+        return features.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().serialize()));
+    }
+
+    /**
+     * Deserializes a map to Features<VersionLevelRange>.

Review comment:
       s/Deserializes/Deserialize

##########
File path: core/src/main/scala/kafka/cluster/Broker.scala
##########
@@ -34,14 +36,19 @@ object Broker {
                                          brokerId: Int,
                                          endpoints: util.List[Endpoint],
                                          interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo
+
+  def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = {

Review comment:
       This is because the write path has not been implemented?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -113,14 +141,26 @@ public static ApiVersionsResponse fromStruct(Struct struct, short version) {
         }
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) {
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<VersionRange> latestSupportedFeatures,
+        Optional<Features<VersionLevelRange>> finalizedFeatures,
+        Optional<Long> finalizedFeaturesEpoch) {
         if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) {
             return DEFAULT_API_VERSIONS_RESPONSE;
         }
-        return createApiVersionsResponse(throttleTimeMs, maxMagic);
+        return createApiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, finalizedFeatures, finalizedFeaturesEpoch);
     }
 
-    public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs, final byte minMagic) {
+    public static ApiVersionsResponse createApiVersionsResponse(

Review comment:
       Looks like we have some gaps for unit testing `ApiVersionsResponse`. Could we add unit tests for this class, since the logic `createApiVersionsResponse` becomes non-trivial now?

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);

Review comment:
       Same here

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache with features.
+class FeatureCacheUpdateException(message: String) extends RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest
+ * returning the features information in the response. In the future, as the feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time when this
+   *           method is invoked. This result could change in the future whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update operation fails
+   *                         due to invalid parameters or incompatibilities with the broker's
+   *                         supported features. In such a case, the existing cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>")
+    if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > latest.epoch) {

Review comment:
       s/ !featuresAndEpoch.isEmpty / featuresAndEpoch.isDefined

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing "finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   Serializes the underlying features to a map, and returns the same.

Review comment:
       Maybe rephrase as `a map with the underlying features serialized`

##########
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##########
@@ -42,6 +42,33 @@
         "about": "The maximum supported version, inclusive." }
     ]},
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
-      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
+      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

Review comment:
       I think we need to bump the schema version to 4? Same with `ApiVersionsRequest.json`

##########
File path: clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<VersionLevelRange> emptyFinalizedFeatures = Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.all());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<VersionRange> emptySupportedFeatures = Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.all());
+        assertEquals(
+            new HashMap<String, HashMap<String, Long>>(),
+            emptySupportedFeatures.serialize());
+        assertEquals(emptySupportedFeatures, Features.deserializeSupportedFeatures(emptyMap));
+    }
+
+    @Test
+    public void testAllAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() {
+            {
+                put("feature_1", v1);
+                put("feature_2", v2);
+            }
+        };
+        Features<VersionRange> features = Features.supportedFeatures(allFeatures);
+        assertEquals(allFeatures, features.all());
+    }
+
+    @Test
+    public void testGetAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() {
+            {
+                put("feature_1", v1);
+                put("feature_2", v2);
+            }
+        };
+        Features<VersionRange> features = Features.supportedFeatures(allFeatures);
+        assertEquals(v1, features.get("feature_1"));
+        assertEquals(v2, features.get("feature_2"));
+        assertNull(features.get("nonexistent_feature"));
+    }
+
+    @Test
+    public void testSerializeDeserializeSupportedFeatures() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() {

Review comment:
       Same here

##########
File path: clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;

Review comment:
       We need the apache license title

##########
File path: core/src/main/scala/kafka/cluster/Broker.scala
##########
@@ -34,14 +36,19 @@ object Broker {
                                          brokerId: Int,
                                          endpoints: util.List[Endpoint],
                                          interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo
+
+  def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = {
+    new Broker(id, endPoints, rack, emptySupportedFeatures)
+  }
 }
 
 /**
  * A Kafka broker.
- * A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map.
+ * A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map,

Review comment:
       nit: might make sense to build meta comment for parameters:
   ```
   /**
     * 
     * @param id
     * @param endPoints
     * @param rack
     * @param features
     */
   ```

##########
File path: clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<VersionLevelRange> emptyFinalizedFeatures = Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.all());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<VersionRange> emptySupportedFeatures = Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.all());
+        assertEquals(
+            new HashMap<String, HashMap<String, Long>>(),
+            emptySupportedFeatures.serialize());
+        assertEquals(emptySupportedFeatures, Features.deserializeSupportedFeatures(emptyMap));
+    }
+
+    @Test
+    public void testAllAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() {
+            {
+                put("feature_1", v1);
+                put("feature_2", v2);
+            }
+        };
+        Features<VersionRange> features = Features.supportedFeatures(allFeatures);
+        assertEquals(allFeatures, features.all());
+    }
+
+    @Test
+    public void testGetAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, VersionRange>() {

Review comment:
       We could use `org.apache.kafka.common.utils.Utils#mkMap` here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org