You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/05/25 18:02:52 UTC
[kafka] branch trunk updated: KAFKA-13833: Remove the min_version_level from the finalized version range written to ZooKeeper (#12062)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 54d60ced86 KAFKA-13833: Remove the min_version_level from the finalized version range written to ZooKeeper (#12062)
54d60ced86 is described below
commit 54d60ced869cbc31776f755c7f464ae2d2ee3e95
Author: dengziming <de...@gmail.com>
AuthorDate: Thu May 26 02:02:34 2022 +0800
KAFKA-13833: Remove the min_version_level from the finalized version range written to ZooKeeper (#12062)
Reviewers: David Arthur <mu...@gmail.com>
---
.../org/apache/kafka/common/feature/Features.java | 28 ----
.../common/feature/FinalizedVersionRange.java | 53 --------
.../common/feature/SupportedVersionRange.java | 12 ++
.../kafka/common/requests/ApiVersionsResponse.java | 23 ++--
.../kafka/clients/admin/KafkaAdminClientTest.java | 17 +--
.../apache/kafka/common/feature/FeaturesTest.java | 54 +-------
.../common/feature/FinalizedVersionRangeTest.java | 80 ------------
.../common/feature/SupportedVersionRangeTest.java | 18 ++-
.../common/requests/ApiVersionsResponseTest.java | 13 +-
.../scala/kafka/controller/KafkaController.scala | 115 ++++++++---------
.../scala/kafka/server/ApiVersionManager.scala | 6 +-
.../main/scala/kafka/server/BrokerFeatures.scala | 29 ++---
.../scala/kafka/server/FinalizedFeatureCache.scala | 27 ++--
.../server/FinalizedFeatureChangeListener.scala | 2 +-
core/src/main/scala/kafka/zk/ZkData.scala | 95 +++++++++-----
.../src/test/scala/kafka/zk/FeatureZNodeTest.scala | 95 +++++++++++---
.../controller/ControllerIntegrationTest.scala | 20 ++-
.../unit/kafka/server/BrokerFeaturesTest.scala | 48 ++++---
.../kafka/server/FinalizedFeatureCacheTest.scala | 18 +--
.../FinalizedFeatureChangeListenerTest.scala | 60 ++++-----
.../unit/kafka/server/UpdateFeaturesTest.scala | 141 ++++++++++-----------
.../kafka/server/common/MetadataVersion.java | 2 +-
.../kafka/server/common/MetadataVersionTest.java | 7 +-
23 files changed, 404 insertions(+), 559 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/feature/Features.java b/clients/src/main/java/org/apache/kafka/common/feature/Features.java
index 4006d71947..a39ee37e53 100644
--- a/clients/src/main/java/org/apache/kafka/common/feature/Features.java
+++ b/clients/src/main/java/org/apache/kafka/common/feature/Features.java
@@ -32,7 +32,6 @@ import static java.util.stream.Collectors.joining;
*
* @param <VersionRangeType> is the type of version range.
* @see SupportedVersionRange
- * @see FinalizedVersionRange
*/
public class Features<VersionRangeType extends BaseVersionRange> {
private final Map<String, VersionRangeType> features;
@@ -57,20 +56,6 @@ public class Features<VersionRangeType extends BaseVersionRange> {
return new Features<>(features);
}
- /**
- * @param features Map of feature name to FinalizedVersionRange.
- *
- * @return Returns a new Features object representing finalized features.
- */
- public static Features<FinalizedVersionRange> finalizedFeatures(Map<String, FinalizedVersionRange> features) {
- return new Features<>(features);
- }
-
- // Visible for testing.
- public static Features<FinalizedVersionRange> emptyFinalizedFeatures() {
- return new Features<>(new HashMap<>());
- }
-
public static Features<SupportedVersionRange> emptySupportedFeatures() {
return new Features<>(new HashMap<>());
}
@@ -138,19 +123,6 @@ public class Features<VersionRangeType extends BaseVersionRange> {
entry -> converter.fromMap(entry.getValue()))));
}
- /**
- * Converts from a map to Features<FinalizedVersionRange>.
- *
- * @param featuresMap the map representation of a Features<FinalizedVersionRange> object,
- * generated using the toMap() API.
- *
- * @return the Features<FinalizedVersionRange> object
- */
- public static Features<FinalizedVersionRange> fromFinalizedFeaturesMap(
- Map<String, Map<String, Short>> featuresMap) {
- return fromFeaturesMap(featuresMap, FinalizedVersionRange::fromMap);
- }
-
/**
* Converts from a map to Features<SupportedVersionRange>.
*
diff --git a/clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java b/clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
deleted file mode 100644
index 27e6440478..0000000000
--- a/clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.feature;
-
-import java.util.Map;
-
-/**
- * An extended {@link BaseVersionRange} representing the min/max versions for a finalized feature.
- */
-public class FinalizedVersionRange extends BaseVersionRange {
- // Label for the min version key, that's used only to convert to/from a map.
- private static final String MIN_VERSION_LEVEL_KEY_LABEL = "min_version_level";
-
- // Label for the max version key, that's used only to convert to/from a map.
- private static final String MAX_VERSION_LEVEL_KEY_LABEL = "max_version_level";
-
- public FinalizedVersionRange(short minVersionLevel, short maxVersionLevel) {
- super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel);
- }
-
- public static FinalizedVersionRange fromMap(Map<String, Short> versionRangeMap) {
- return new FinalizedVersionRange(
- BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, versionRangeMap),
- BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, versionRangeMap));
- }
-
- /**
- * Checks if the [min, max] version level range of this object does *NOT* fall within the
- * [min, max] range of the provided SupportedVersionRange parameter.
- *
- * @param supportedVersionRange the SupportedVersionRange to be checked
- *
- * @return - true, if the version levels are compatible
- * - false otherwise
- */
- public boolean isIncompatibleWith(SupportedVersionRange supportedVersionRange) {
- return min() < supportedVersionRange.min() || max() > supportedVersionRange.max();
- }
-}
diff --git a/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java b/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
index 8993014a74..a864a91762 100644
--- a/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
+++ b/clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
@@ -41,4 +41,16 @@ public class SupportedVersionRange extends BaseVersionRange {
BaseVersionRange.valueOrThrow(MIN_VERSION_KEY_LABEL, versionRangeMap),
BaseVersionRange.valueOrThrow(MAX_VERSION_KEY_LABEL, versionRangeMap));
}
+
+ /**
+ * Checks if the version level does *NOT* fall within the [min, max] range of this SupportedVersionRange.
+ *
+ * @param version the version to be checked
+ *
+ * @return - true, if the version levels are incompatible
+ * - false otherwise
+ */
+ public boolean isIncompatibleWith(short version) {
+ return min() > version || max() < version;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 80d026330f..7c98eb2679 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.feature.Features;
-import org.apache.kafka.common.feature.FinalizedVersionRange;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiMessageType.ListenerType;
@@ -35,6 +34,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordVersion;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -118,16 +118,15 @@ public class ApiVersionsResponse extends AbstractResponse {
throttleTimeMs,
apiVersions,
Features.emptySupportedFeatures(),
- Features.emptyFinalizedFeatures(),
- UNKNOWN_FINALIZED_FEATURES_EPOCH
- );
+ Collections.emptyMap(),
+ UNKNOWN_FINALIZED_FEATURES_EPOCH);
}
public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
RecordVersion minRecordVersion,
Features<SupportedVersionRange> latestSupportedFeatures,
- Features<FinalizedVersionRange> finalizedFeatures,
+ Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch,
NodeApiVersions controllerApiVersions,
ListenerType listenerType
@@ -153,7 +152,7 @@ public class ApiVersionsResponse extends AbstractResponse {
int throttleTimeMs,
ApiVersionCollection apiVersions,
Features<SupportedVersionRange> latestSupportedFeatures,
- Features<FinalizedVersionRange> finalizedFeatures,
+ Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
) {
return new ApiVersionsResponse(
@@ -233,7 +232,7 @@ public class ApiVersionsResponse extends AbstractResponse {
final Errors error,
final ApiVersionCollection apiKeys,
final Features<SupportedVersionRange> latestSupportedFeatures,
- final Features<FinalizedVersionRange> finalizedFeatures,
+ final Map<String, Short> finalizedFeatures,
final long finalizedFeaturesEpoch
) {
final ApiVersionsResponseData data = new ApiVersionsResponseData();
@@ -263,14 +262,14 @@ public class ApiVersionsResponse extends AbstractResponse {
}
private static FinalizedFeatureKeyCollection createFinalizedFeatureKeys(
- Features<FinalizedVersionRange> finalizedFeatures) {
+ Map<String, Short> finalizedFeatures) {
FinalizedFeatureKeyCollection converted = new FinalizedFeatureKeyCollection();
- for (Map.Entry<String, FinalizedVersionRange> feature : finalizedFeatures.features().entrySet()) {
+ for (Map.Entry<String, Short> feature : finalizedFeatures.entrySet()) {
final FinalizedFeatureKey key = new FinalizedFeatureKey();
- final FinalizedVersionRange versionLevelRange = feature.getValue();
+ final short versionLevel = feature.getValue();
key.setName(feature.getKey());
- key.setMinVersionLevel(versionLevelRange.min());
- key.setMaxVersionLevel(versionLevelRange.max());
+ key.setMinVersionLevel(versionLevel);
+ key.setMaxVersionLevel(versionLevel);
converted.add(key);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index eb4681856e..637c3d15cb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -545,7 +545,7 @@ public class KafkaAdminClientTest {
private static FeatureMetadata defaultFeatureMetadata() {
return new FeatureMetadata(
- Utils.mkMap(Utils.mkEntry("test_feature_1", new FinalizedVersionRange((short) 2, (short) 3))),
+ Utils.mkMap(Utils.mkEntry("test_feature_1", new FinalizedVersionRange((short) 2, (short) 2))),
Optional.of(1L),
Utils.mkMap(Utils.mkEntry("test_feature_1", new SupportedVersionRange((short) 1, (short) 5))));
}
@@ -563,26 +563,13 @@ public class KafkaAdminClientTest {
return Features.supportedFeatures(featuresMap);
}
- private static Features<org.apache.kafka.common.feature.FinalizedVersionRange> convertFinalizedFeaturesMap(Map<String, FinalizedVersionRange> features) {
- final Map<String, org.apache.kafka.common.feature.FinalizedVersionRange> featuresMap = new HashMap<>();
- for (final Map.Entry<String, FinalizedVersionRange> entry : features.entrySet()) {
- final FinalizedVersionRange versionRange = entry.getValue();
- featuresMap.put(
- entry.getKey(),
- new org.apache.kafka.common.feature.FinalizedVersionRange(
- versionRange.minVersionLevel(), versionRange.maxVersionLevel()));
- }
-
- return Features.finalizedFeatures(featuresMap);
- }
-
private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures(Errors error) {
if (error == Errors.NONE) {
return ApiVersionsResponse.createApiVersionsResponse(
0,
ApiVersionsResponse.filterApis(RecordVersion.current(), ApiMessageType.ListenerType.ZK_BROKER),
convertSupportedFeaturesMap(defaultFeatureMetadata().supportedFeatures()),
- convertFinalizedFeaturesMap(defaultFeatureMetadata().finalizedFeatures()),
+ Collections.singletonMap("test_feature_1", (short) 2),
defaultFeatureMetadata().finalizedFeaturesEpoch().get()
);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java b/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
index 88b3471208..0b2bc4f50a 100644
--- a/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
@@ -25,7 +25,7 @@ import org.junit.jupiter.api.Test;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -36,11 +36,6 @@ public class FeaturesTest {
public void testEmptyFeatures() {
Map<String, Map<String, Short>> emptyMap = new HashMap<>();
- Features<FinalizedVersionRange> emptyFinalizedFeatures = Features.emptyFinalizedFeatures();
- assertTrue(emptyFinalizedFeatures.features().isEmpty());
- assertTrue(emptyFinalizedFeatures.toMap().isEmpty());
- assertEquals(emptyFinalizedFeatures, Features.fromFinalizedFeaturesMap(emptyMap));
-
Features<SupportedVersionRange> emptySupportedFeatures = Features.emptySupportedFeatures();
assertTrue(emptySupportedFeatures.features().isEmpty());
assertTrue(emptySupportedFeatures.toMap().isEmpty());
@@ -49,9 +44,6 @@ public class FeaturesTest {
@Test
public void testNullFeatures() {
- assertThrows(
- NullPointerException.class,
- () -> Features.finalizedFeatures(null));
assertThrows(
NullPointerException.class,
() -> Features.supportedFeatures(null));
@@ -93,34 +85,6 @@ public class FeaturesTest {
assertEquals(features, Features.fromSupportedFeaturesMap(expected));
}
- @Test
- public void testFromToFinalizedFeaturesMap() {
- FinalizedVersionRange v1 = new FinalizedVersionRange((short) 1, (short) 2);
- FinalizedVersionRange v2 = new FinalizedVersionRange((short) 3, (short) 4);
- Map<String, FinalizedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
-
- Features<FinalizedVersionRange> features = Features.finalizedFeatures(allFeatures);
-
- Map<String, Map<String, Short>> expected = mkMap(
- mkEntry("feature_1", mkMap(mkEntry("min_version_level", (short) 1), mkEntry("max_version_level", (short) 2))),
- mkEntry("feature_2", mkMap(mkEntry("min_version_level", (short) 3), mkEntry("max_version_level", (short) 4))));
- assertEquals(expected, features.toMap());
- assertEquals(features, Features.fromFinalizedFeaturesMap(expected));
- }
-
- @Test
- public void testToStringFinalizedFeatures() {
- FinalizedVersionRange v1 = new FinalizedVersionRange((short) 1, (short) 2);
- FinalizedVersionRange v2 = new FinalizedVersionRange((short) 3, (short) 4);
- Map<String, FinalizedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
-
- Features<FinalizedVersionRange> features = Features.finalizedFeatures(allFeatures);
-
- assertEquals(
- "Features{(feature_1 -> FinalizedVersionRange[min_version_level:1, max_version_level:2]), (feature_2 -> FinalizedVersionRange[min_version_level:3, max_version_level:4])}",
- features.toString());
- }
-
@Test
public void testToStringSupportedFeatures() {
SupportedVersionRange v1 = new SupportedVersionRange((short) 1, (short) 2);
@@ -145,29 +109,19 @@ public class FeaturesTest {
() -> Features.fromSupportedFeaturesMap(invalidFeatures));
}
- @Test
- public void testFinalizedFeaturesFromMapFailureWithInvalidMissingMaxVersionLevel() {
- // This is invalid because 'max_version_level' key is missing.
- Map<String, Map<String, Short>> invalidFeatures = mkMap(
- mkEntry("feature_1", mkMap(mkEntry("min_version_level", (short) 1))));
- assertThrows(
- IllegalArgumentException.class,
- () -> Features.fromFinalizedFeaturesMap(invalidFeatures));
- }
-
@Test
public void testEquals() {
SupportedVersionRange v1 = new SupportedVersionRange((short) 1, (short) 2);
Map<String, SupportedVersionRange> allFeatures = mkMap(mkEntry("feature_1", v1));
Features<SupportedVersionRange> features = Features.supportedFeatures(allFeatures);
Features<SupportedVersionRange> featuresClone = Features.supportedFeatures(allFeatures);
- assertTrue(features.equals(featuresClone));
+ assertEquals(features, featuresClone);
SupportedVersionRange v2 = new SupportedVersionRange((short) 1, (short) 3);
Map<String, SupportedVersionRange> allFeaturesDifferent = mkMap(mkEntry("feature_1", v2));
Features<SupportedVersionRange> featuresDifferent = Features.supportedFeatures(allFeaturesDifferent);
- assertFalse(features.equals(featuresDifferent));
+ assertNotEquals(features, featuresDifferent);
- assertFalse(features.equals(null));
+ assertNotEquals(null, features);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java b/clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java
deleted file mode 100644
index 989c4bd1a5..0000000000
--- a/clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.feature;
-
-import java.util.Map;
-
-import org.junit.jupiter.api.Test;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Unit tests for the FinalizedVersionRange class.
- *
- * Most of the unit tests required for BaseVersionRange are part of the SupportedVersionRangeTest
- * suite. This suite only tests behavior very specific to FinalizedVersionRange.
- */
-public class FinalizedVersionRangeTest {
-
- @Test
- public void testFromToMap() {
- FinalizedVersionRange versionRange = new FinalizedVersionRange((short) 1, (short) 2);
- assertEquals(1, versionRange.min());
- assertEquals(2, versionRange.max());
-
- Map<String, Short> versionRangeMap = versionRange.toMap();
- assertEquals(
- mkMap(
- mkEntry("min_version_level", versionRange.min()),
- mkEntry("max_version_level", versionRange.max())),
- versionRangeMap);
-
- FinalizedVersionRange newVersionRange = FinalizedVersionRange.fromMap(versionRangeMap);
- assertEquals(1, newVersionRange.min());
- assertEquals(2, newVersionRange.max());
- assertEquals(versionRange, newVersionRange);
- }
-
- @Test
- public void testToString() {
- assertEquals("FinalizedVersionRange[min_version_level:1, max_version_level:1]", new FinalizedVersionRange((short) 1, (short) 1).toString());
- assertEquals("FinalizedVersionRange[min_version_level:1, max_version_level:2]", new FinalizedVersionRange((short) 1, (short) 2).toString());
- }
-
- @Test
- public void testIsCompatibleWith() {
- assertFalse(new FinalizedVersionRange((short) 1, (short) 1).isIncompatibleWith(new SupportedVersionRange((short) 1, (short) 1)));
- assertFalse(new FinalizedVersionRange((short) 2, (short) 3).isIncompatibleWith(new SupportedVersionRange((short) 1, (short) 4)));
- assertFalse(new FinalizedVersionRange((short) 1, (short) 4).isIncompatibleWith(new SupportedVersionRange((short) 1, (short) 4)));
-
- assertTrue(new FinalizedVersionRange((short) 1, (short) 4).isIncompatibleWith(new SupportedVersionRange((short) 2, (short) 3)));
- assertTrue(new FinalizedVersionRange((short) 1, (short) 4).isIncompatibleWith(new SupportedVersionRange((short) 2, (short) 4)));
- assertTrue(new FinalizedVersionRange((short) 2, (short) 4).isIncompatibleWith(new SupportedVersionRange((short) 2, (short) 3)));
- }
-
- @Test
- public void testMinMax() {
- FinalizedVersionRange versionRange = new FinalizedVersionRange((short) 1, (short) 2);
- assertEquals(1, versionRange.min());
- assertEquals(2, versionRange.max());
- }
-}
diff --git a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
index acf452d820..a1d2af419f 100644
--- a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
@@ -25,6 +25,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -128,9 +129,9 @@ public class SupportedVersionRangeTest {
@Test
public void testEquals() {
SupportedVersionRange tested = new SupportedVersionRange((short) 1, (short) 1);
- assertTrue(tested.equals(tested));
- assertFalse(tested.equals(new SupportedVersionRange((short) 1, (short) 2)));
- assertFalse(tested.equals(null));
+ assertEquals(tested, tested);
+ assertNotEquals(tested, new SupportedVersionRange((short) 1, (short) 2));
+ assertNotEquals(null, tested);
}
@Test
@@ -139,4 +140,15 @@ public class SupportedVersionRangeTest {
assertEquals(1, versionRange.min());
assertEquals(2, versionRange.max());
}
+
+ @Test
+ public void testIsIncompatibleWith() {
+ assertFalse(new SupportedVersionRange((short) 1, (short) 1).isIncompatibleWith((short) 1));
+ assertFalse(new SupportedVersionRange((short) 1, (short) 4).isIncompatibleWith((short) 2));
+ assertFalse(new SupportedVersionRange((short) 1, (short) 4).isIncompatibleWith((short) 1));
+ assertFalse(new SupportedVersionRange((short) 1, (short) 4).isIncompatibleWith((short) 4));
+
+ assertTrue(new SupportedVersionRange((short) 2, (short) 3).isIncompatibleWith((short) 1));
+ assertTrue(new SupportedVersionRange((short) 2, (short) 3).isIncompatibleWith((short) 4));
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index c3fa6b892e..62571c6986 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -17,9 +17,9 @@
package org.apache.kafka.common.requests;
+import java.util.Collections;
import java.util.HashSet;
import org.apache.kafka.common.feature.Features;
-import org.apache.kafka.common.feature.FinalizedVersionRange;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiMessageType.ListenerType;
@@ -116,7 +116,7 @@ public class ApiVersionsResponseTest {
10,
RecordVersion.V1,
Features.emptySupportedFeatures(),
- Features.emptyFinalizedFeatures(),
+ Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER
@@ -135,8 +135,7 @@ public class ApiVersionsResponseTest {
RecordVersion.V1,
Features.supportedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange((short) 1, (short) 4)))),
- Features.finalizedFeatures(
- Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange((short) 2, (short) 3)))),
+ Utils.mkMap(Utils.mkEntry("feature", (short) 3)),
10L,
null,
ListenerType.ZK_BROKER
@@ -152,7 +151,7 @@ public class ApiVersionsResponseTest {
assertEquals(1, response.data().finalizedFeatures().size());
FinalizedFeatureKey fKey = response.data().finalizedFeatures().find("feature");
assertNotNull(fKey);
- assertEquals(2, fKey.minVersionLevel());
+ assertEquals(3, fKey.minVersionLevel());
assertEquals(3, fKey.maxVersionLevel());
assertEquals(10, response.data().finalizedFeaturesEpoch());
}
@@ -163,7 +162,7 @@ public class ApiVersionsResponseTest {
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordVersion.current(),
Features.emptySupportedFeatures(),
- Features.emptyFinalizedFeatures(),
+ Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER
@@ -181,7 +180,7 @@ public class ApiVersionsResponseTest {
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordVersion.current(),
Features.emptySupportedFeatures(),
- Features.emptyFinalizedFeatures(),
+ Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
null,
ListenerType.ZK_BROKER
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f4a8569b80..b753cd408b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -38,7 +38,6 @@ import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterPartitionRequestData, AlterPartitionResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
@@ -309,7 +308,7 @@ class KafkaController(val config: KafkaConfig,
* This method enables the feature versioning system (KIP-584).
*
* Development in Kafka (from a high level) is organized into features. Each feature is tracked by
- * a name and a range of version numbers. A feature can be of two types:
+ * a name and a range of version numbers or a version number. A feature can be of two types:
*
* 1. Supported feature:
* A supported feature is represented by a name (string) and a range of versions (defined by a
@@ -320,8 +319,8 @@ class KafkaController(val config: KafkaConfig,
* range of versions.
*
* 2. Finalized feature:
- * A finalized feature is represented by a name (string) and a range of version levels (defined
- * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is
+ * A finalized feature is represented by a name (string) and a specified version level (defined
+ * by a Short). Whenever the feature versioning system (KIP-584) is
* enabled, the finalized features are stored in the cluster-wide common FeatureZNode.
* In comparison to a supported feature, the key difference is that a finalized feature exists
* in ZK only when it is guaranteed to be supported by any random broker in the cluster for a
@@ -385,21 +384,24 @@ class KafkaController(val config: KafkaConfig,
private def enableFeatureVersioning(): Unit = {
val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
if (version == ZkVersion.UnknownVersion) {
- val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled,
- brokerFeatures.defaultFinalizedFeatures))
+ val newVersion = createFeatureZNode(
+ FeatureZNode(config.interBrokerProtocolVersion,
+ FeatureZNodeStatus.Enabled,
+ brokerFeatures.defaultFinalizedFeatures
+ ))
featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
} else {
val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
val newFeatures = existingFeatureZNode.status match {
case FeatureZNodeStatus.Enabled => existingFeatureZNode.features
case FeatureZNodeStatus.Disabled =>
- if (!existingFeatureZNode.features.empty()) {
+ if (existingFeatureZNode.features.nonEmpty) {
warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled status" +
s" contains non-empty features: ${existingFeatureZNode.features}")
}
- Features.emptyFinalizedFeatures
+ Map.empty[String, Short]
}
- val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, newFeatures)
+ val newFeatureZNode = FeatureZNode(config.interBrokerProtocolVersion, FeatureZNodeStatus.Enabled, newFeatures)
if (!newFeatureZNode.equals(existingFeatureZNode)) {
val newVersion = updateFeatureZNode(newFeatureZNode)
featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs)
@@ -423,14 +425,14 @@ class KafkaController(val config: KafkaConfig,
* are disabled when IBP config is < than IBP_2_7_IV0.
*/
private def disableFeatureVersioning(): Unit = {
- val newNode = FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures())
+ val newNode = FeatureZNode(config.interBrokerProtocolVersion, FeatureZNodeStatus.Disabled, Map.empty[String, Short])
val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
if (version == ZkVersion.UnknownVersion) {
createFeatureZNode(newNode)
} else {
val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
if (existingFeatureZNode.status == FeatureZNodeStatus.Disabled &&
- !existingFeatureZNode.features.empty()) {
+ existingFeatureZNode.features.nonEmpty) {
warn(s"FeatureZNode at path: ${FeatureZNode.path} with disabled status" +
s" contains non-empty features: ${existingFeatureZNode.features}")
}
@@ -1087,7 +1089,7 @@ class KafkaController(val config: KafkaConfig,
}
}
- private def registerPartitionModificationsHandlers(topics: Seq[String]) = {
+ private def registerPartitionModificationsHandlers(topics: Seq[String]): Unit = {
topics.foreach { topic =>
val partitionModificationsHandler = new PartitionModificationsHandler(eventManager, topic)
partitionModificationsHandlers.put(topic, partitionModificationsHandler)
@@ -1095,7 +1097,7 @@ class KafkaController(val config: KafkaConfig,
partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
}
- private[controller] def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
+ private[controller] def unregisterPartitionModificationsHandlers(topics: Seq[String]): Unit = {
topics.foreach { topic =>
partitionModificationsHandlers.remove(topic).foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
}
@@ -1907,16 +1909,16 @@ class KafkaController(val config: KafkaConfig,
}
/**
- * Returns the new FinalizedVersionRange for the feature, if there are no feature
+ * Returns the new finalized version for the feature, if there are no feature
* incompatibilities seen with all known brokers for the provided feature update.
* Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
*
* @param update the feature update to be processed (this can not be meant to delete the feature)
*
- * @return the new FinalizedVersionRange or error, as described above.
+ * @return the new finalized version or error, as described above.
*/
- private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequest.FeatureUpdateItem):
- Either[FinalizedVersionRange, ApiError] = {
+ private def newFinalizedVersionOrIncompatibilityError(update: UpdateFeaturesRequest.FeatureUpdateItem):
+ Either[Short, ApiError] = {
if (update.isDeleteRequest) {
throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update")
}
@@ -1927,28 +1929,19 @@ class KafkaController(val config: KafkaConfig,
"Could not apply finalized feature update because the provided feature" +
" is not supported."))
} else {
- var newVersionRange: FinalizedVersionRange = null
- try {
- newVersionRange = new FinalizedVersionRange(update.versionLevel(), update.versionLevel())
- } catch {
- case _: IllegalArgumentException => {
- // This exception means the provided maxVersionLevel is invalid. It is handled below
- // outside of this catch clause.
- }
- }
- if (newVersionRange == null) {
+ val newVersion = update.versionLevel()
+ if (supportedVersionRange.isIncompatibleWith(newVersion)) {
Right(new ApiError(Errors.INVALID_REQUEST,
"Could not apply finalized feature update because the provided" +
- s" maxVersionLevel:${update.versionLevel} is lower than the" +
+ s" versionLevel:${update.versionLevel} is lower than the" +
s" supported minVersion:${supportedVersionRange.min}."))
} else {
- val newFinalizedFeature =
- Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange)))
+ val newFinalizedFeature = Utils.mkMap(Utils.mkEntry(update.feature, newVersion)).asScala.toMap
val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => {
BrokerFeatures.hasIncompatibleFeatures(broker.features, newFinalizedFeature)
})
if (numIncompatibleBrokers == 0) {
- Left(newVersionRange)
+ Left(newVersion)
} else {
Right(new ApiError(Errors.INVALID_REQUEST,
"Could not apply finalized feature update because" +
@@ -1959,24 +1952,24 @@ class KafkaController(val config: KafkaConfig,
}
/**
- * Validates a feature update on an existing FinalizedVersionRange.
+ * Validates a feature update on an existing finalized version.
* If the validation succeeds, then, the return value contains:
- * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature.
+ * 1. the new finalized version for the feature, if the feature update was not meant to delete the feature.
* 2. Option.empty, if the feature update was meant to delete the feature.
*
* If the validation fails, then returned value contains a suitable ApiError.
*
- * @param update the feature update to be processed.
- * @param existingVersionRange the existing FinalizedVersionRange which can be empty when no
- * FinalizedVersionRange exists for the associated feature
+ * @param update the feature update to be processed.
+ * @param existingVersion the existing finalized version which can be empty when no
+ * finalized version exists for the associated feature
*
- * @return the new FinalizedVersionRange to be updated into ZK or error
- * as described above.
+ * @return the new finalized version to be updated into ZK or error
+ * as described above.
*/
private def validateFeatureUpdate(update: UpdateFeaturesRequest.FeatureUpdateItem,
- existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = {
- def newVersionRangeOrError(update: UpdateFeaturesRequest.FeatureUpdateItem): Either[Option[FinalizedVersionRange], ApiError] = {
- newFinalizedVersionRangeOrIncompatibilityError(update)
+ existingVersion: Option[Short]): Either[Option[Short], ApiError] = {
+ def newVersionRangeOrError(update: UpdateFeaturesRequest.FeatureUpdateItem): Either[Option[Short], ApiError] = {
+ newFinalizedVersionOrIncompatibilityError(update)
.fold(versionRange => Left(Some(versionRange)), error => Right(error))
}
@@ -1989,7 +1982,7 @@ class KafkaController(val config: KafkaConfig,
// We handle deletion requests separately from non-deletion requests.
if (update.isDeleteRequest) {
- if (existingVersionRange.isEmpty) {
+ if (existingVersion.isEmpty) {
// Disallow deletion of a non-existing finalized feature.
Right(new ApiError(Errors.INVALID_REQUEST,
"Can not delete non-existing finalized feature."))
@@ -1999,30 +1992,30 @@ class KafkaController(val config: KafkaConfig,
} else if (update.versionLevel() < 1) {
// Disallow deletion of a finalized feature without SAFE downgrade type.
Right(new ApiError(Errors.INVALID_REQUEST,
- s"Can not provide maxVersionLevel: ${update.versionLevel} less" +
+ s"Can not provide versionLevel: ${update.versionLevel} less" +
s" than 1 without setting the SAFE downgradeType in the request."))
} else {
- existingVersionRange.map(existing =>
- if (update.versionLevel == existing.max) {
- // Disallow a case where target maxVersionLevel matches existing maxVersionLevel.
+ existingVersion.map(existing =>
+ if (update.versionLevel == existing) {
+ // Disallow a case where target versionLevel matches existing versionLevel.
Right(new ApiError(Errors.INVALID_REQUEST,
s"Can not ${if (update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) "downgrade" else "upgrade"}" +
- s" a finalized feature from existing maxVersionLevel:${existing.max}" +
+ s" a finalized feature from existing versionLevel:$existing" +
" to the same value."))
- } else if (update.versionLevel < existing.max && !update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) {
+ } else if (update.versionLevel < existing && !update.upgradeType.equals(UpgradeType.SAFE_DOWNGRADE)) {
// Disallow downgrade of a finalized feature without the downgradeType set.
Right(new ApiError(Errors.INVALID_REQUEST,
s"Can not downgrade finalized feature from existing" +
- s" maxVersionLevel:${existing.max} to provided" +
- s" maxVersionLevel:${update.versionLevel} without setting the" +
+ s" versionLevel:$existing to provided" +
+ s" versionLevel:${update.versionLevel} without setting the" +
" downgradeType to SAFE in the request."))
- } else if (!update.upgradeType.equals(UpgradeType.UPGRADE) && update.versionLevel > existing.max) {
+ } else if (!update.upgradeType.equals(UpgradeType.UPGRADE) && update.versionLevel > existing) {
// Disallow a request that sets downgradeType without specifying a
- // maxVersionLevel that's lower than the existing maxVersionLevel.
+ // versionLevel that's lower than the existing versionLevel.
Right(new ApiError(Errors.INVALID_REQUEST,
- s"When the downgradeType is set to SAFE set in the request, the provided" +
- s" maxVersionLevel:${update.versionLevel} can not be greater than" +
- s" existing maxVersionLevel:${existing.max}."))
+ s"When the downgradeType is set to SAFE in the request, the provided" +
+ s" versionLevel:${update.versionLevel} can not be greater than" +
+ s" existing versionLevel:$existing."))
} else {
newVersionRangeOrError(update)
}
@@ -2044,11 +2037,11 @@ class KafkaController(val config: KafkaConfig,
callback: UpdateFeaturesCallback): Unit = {
val updates = request.featureUpdates
val existingFeatures = featureCache.get
- .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
- .getOrElse(Map[String, FinalizedVersionRange]())
- // A map with key being feature name and value being FinalizedVersionRange.
+ .map(featuresAndEpoch => featuresAndEpoch.features)
+ .getOrElse(Map[String, Short]())
+ // A map with key being feature name and value being finalized version.
// This contains the target features to be eventually written to FeatureZNode.
- val targetFeatures = scala.collection.mutable.Map[String, FinalizedVersionRange]() ++ existingFeatures
+ val targetFeatures = scala.collection.mutable.Map[String, Short]() ++ existingFeatures
// A map with key being feature name and value being error encountered when the FeatureUpdate
// was applied.
val errors = scala.collection.mutable.Map[String, ApiError]()
@@ -2057,7 +2050,7 @@ class KafkaController(val config: KafkaConfig,
// - If a FeatureUpdate is found to be valid, then:
// - The corresponding entry in errors map would be updated to contain Errors.NONE.
// - If the FeatureUpdate is an add or update request, then the targetFeatures map is updated
- // to contain the new FinalizedVersionRange for the feature.
+ // to contain the new finalized version for the feature.
// - Otherwise if the FeatureUpdate is a delete request, then the feature is removed from the
// targetFeatures map.
// - Otherwise if a FeatureUpdate is found to be invalid, then:
@@ -2082,7 +2075,7 @@ class KafkaController(val config: KafkaConfig,
// of the existing finalized features in ZK.
try {
if (!existingFeatures.equals(targetFeatures)) {
- val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, Features.finalizedFeatures(targetFeatures.asJava))
+ val newNode = FeatureZNode(config.interBrokerProtocolVersion, FeatureZNodeStatus.Enabled, targetFeatures)
val newVersion = updateFeatureZNode(newNode)
featureCache.waitUntilEpochOrThrow(newVersion, request.data().timeoutMs())
}
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 61a59ec2ce..ccb13c75d5 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -18,12 +18,12 @@ package kafka.server
import kafka.network
import kafka.network.RequestChannel
-import org.apache.kafka.common.feature.Features
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
import org.apache.kafka.server.common.MetadataVersion
+import java.util.Collections
import scala.jdk.CollectionConverters._
trait ApiVersionManager {
@@ -86,7 +86,7 @@ class DefaultApiVersionManager(
throttleTimeMs,
interBrokerProtocolVersion.highestSupportedRecordVersion,
supportedFeatures,
- finalizedFeatures.features,
+ finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
finalizedFeatures.epoch,
controllerApiVersions.orNull,
listenerType)
@@ -94,7 +94,7 @@ class DefaultApiVersionManager(
throttleTimeMs,
interBrokerProtocolVersion.highestSupportedRecordVersion,
supportedFeatures,
- Features.emptyFinalizedFeatures,
+ Collections.emptyMap(),
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
controllerApiVersions.orNull,
listenerType)
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index ff7e60908c..d385f1eb07 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -18,7 +18,7 @@
package kafka.server
import kafka.utils.Logging
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.common.MetadataVersion
import java.util
@@ -42,12 +42,10 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
* Returns the default finalized features that a new Kafka cluster with IBP config >= IBP_2_7_IV0
* needs to be bootstrapped with.
*/
- def defaultFinalizedFeatures: Features[FinalizedVersionRange] = {
- Features.finalizedFeatures(
- supportedFeatures.features.asScala.map {
- case(name, versionRange) => (
- name, new FinalizedVersionRange(versionRange.max, versionRange.max))
- }.asJava)
+ def defaultFinalizedFeatures: Map[String, Short] = {
+ supportedFeatures.features.asScala.map {
+ case(name, versionRange) => (name, versionRange.max)
+ }.toMap
}
/**
@@ -65,7 +63,7 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
* @return The subset of input features which are incompatible. If the returned object
* is empty, it means there were no feature incompatibilities found.
*/
- def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = {
+ def incompatibleFeatures(finalized: Map[String, Short]): Map[String, Short] = {
BrokerFeatures.incompatibleFeatures(supportedFeatures, finalized, logIncompatibilities = true)
}
}
@@ -93,19 +91,19 @@ object BrokerFeatures extends Logging {
* - False otherwise.
*/
def hasIncompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
- finalizedFeatures: Features[FinalizedVersionRange]): Boolean = {
- !incompatibleFeatures(supportedFeatures, finalizedFeatures, logIncompatibilities = false).empty
+ finalizedFeatures: Map[String, Short]): Boolean = {
+ incompatibleFeatures(supportedFeatures, finalizedFeatures, logIncompatibilities = false).nonEmpty
}
private def incompatibleFeatures(supportedFeatures: Features[SupportedVersionRange],
- finalizedFeatures: Features[FinalizedVersionRange],
- logIncompatibilities: Boolean): Features[FinalizedVersionRange] = {
- val incompatibleFeaturesInfo = finalizedFeatures.features.asScala.map {
+ finalizedFeatures: Map[String, Short],
+ logIncompatibilities: Boolean): Map[String, Short] = {
+ val incompatibleFeaturesInfo = finalizedFeatures.map {
case (feature, versionLevels) =>
val supportedVersions = supportedFeatures.get(feature)
if (supportedVersions == null) {
(feature, versionLevels, "{feature=%s, reason='Unsupported feature'}".format(feature))
- } else if (versionLevels.isIncompatibleWith(supportedVersions)) {
+ } else if (supportedVersions.isIncompatibleWith(versionLevels)) {
(feature, versionLevels, "{feature=%s, reason='%s is incompatible with %s'}".format(
feature, versionLevels, supportedVersions))
} else {
@@ -117,7 +115,6 @@ object BrokerFeatures extends Logging {
warn("Feature incompatibilities seen: " +
incompatibleFeaturesInfo.map { case(_, _, errorReason) => errorReason }.mkString(", "))
}
- Features.finalizedFeatures(
- incompatibleFeaturesInfo.map { case(feature, versionLevels, _) => (feature, versionLevels) }.toMap.asJava)
+ incompatibleFeaturesInfo.map { case(feature, versionLevels, _) => (feature, versionLevels) }.toMap
}
}
diff --git a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
index 390110dba0..d414e7671a 100644
--- a/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
+++ b/core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
@@ -17,23 +17,22 @@
package kafka.server
-import java.util
-import java.util.Collections
import kafka.utils.Logging
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
import org.apache.kafka.image.FeaturesDelta
import org.apache.kafka.server.common.MetadataVersion
+import java.util
+import scala.compat.java8.OptionConverters._
import scala.concurrent.TimeoutException
+import scala.jdk.CollectionConverters._
import scala.math.max
-import scala.compat.java8.OptionConverters._
// Raised whenever there was an error in updating the FinalizedFeatureCache with features.
class FeatureCacheUpdateException(message: String) extends RuntimeException(message) {
}
// Helper class that represents finalized features along with an epoch value.
-case class FinalizedFeaturesAndEpoch(features: Features[FinalizedVersionRange], epoch: Long) {
+case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long) {
override def toString(): String = {
s"FinalizedFeaturesAndEpoch(features=$features, epoch=$epoch)"
}
@@ -107,7 +106,7 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
* supported features. In such a case, the existing cache contents are
* not modified.
*/
- def updateOrThrow(latestFeatures: Features[FinalizedVersionRange], latestEpoch: Long): Unit = {
+ def updateOrThrow(latestFeatures: Map[String, Short], latestEpoch: Long): Unit = {
val latest = FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)
val existing = featuresAndEpoch.map(item => item.toString()).getOrElse("<empty>")
if (featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch > latest.epoch) {
@@ -116,7 +115,7 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
throw new FeatureCacheUpdateException(errorMsg)
} else {
val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features)
- if (!incompatibleFeatures.empty) {
+ if (incompatibleFeatures.nonEmpty) {
val errorMsg = "FinalizedFeatureCache update failed since feature compatibility" +
s" checks failed! Supported ${brokerFeatures.supportedFeatures} has incompatibilities" +
s" with the latest $latest."
@@ -134,21 +133,19 @@ class FinalizedFeatureCache(private val brokerFeatures: BrokerFeatures) extends
def update(featuresDelta: FeaturesDelta, highestMetadataOffset: Long): Unit = {
val features = featuresAndEpoch.getOrElse(
- FinalizedFeaturesAndEpoch(Features.emptyFinalizedFeatures(), -1))
- val newFeatures = new util.HashMap[String, FinalizedVersionRange]()
- newFeatures.putAll(features.features.features())
+ FinalizedFeaturesAndEpoch(Map.empty, -1))
+ val newFeatures = new util.HashMap[String, Short]()
+ newFeatures.putAll(features.features.asJava)
featuresDelta.changes().entrySet().forEach { e =>
e.getValue.asScala match {
case None => newFeatures.remove(e.getKey)
- case Some(version) => newFeatures.put(e.getKey,
- new FinalizedVersionRange(version, version))
+ case Some(version) => newFeatures.put(e.getKey, version)
}
}
featuresDelta.metadataVersionChange().ifPresent { metadataVersion =>
- newFeatures.put(MetadataVersion.FEATURE_NAME, new FinalizedVersionRange(metadataVersion.featureLevel(), metadataVersion.featureLevel()))
+ newFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel())
}
- featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(Features.finalizedFeatures(
- Collections.unmodifiableMap(newFeatures)), highestMetadataOffset))
+ featuresAndEpoch = Some(FinalizedFeaturesAndEpoch(newFeatures.asScala.toMap, highestMetadataOffset))
}
/**
diff --git a/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
index 8f10ab661a..04f138ba7b 100644
--- a/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
+++ b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
@@ -105,7 +105,7 @@ class FinalizedFeatureChangeListener(private val finalizedFeatureCache: Finalize
finalizedFeatureCache.clear()
}
case FeatureZNodeStatus.Enabled => {
- finalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+ finalizedFeatureCache.updateOrThrow(featureZNode.features.toMap, version)
}
case _ => throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode")
}
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 9733e9c83d..7006a21f94 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -33,7 +33,7 @@ import kafka.utils.Json
import kafka.utils.json.JsonObject
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.feature.Features._
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -880,20 +880,37 @@ object FeatureZNodeStatus {
/**
* Represents the contents of the ZK node containing finalized feature information.
*
+ * @param version the version of ZK node, we removed min_version_level in version 2
* @param status the status of the ZK node
* @param features the cluster-wide finalized features
*/
-case class FeatureZNode(status: FeatureZNodeStatus, features: Features[FinalizedVersionRange]) {
+case class FeatureZNode(version: Int, status: FeatureZNodeStatus, features: Map[String, Short]) {
}
object FeatureZNode {
private val VersionKey = "version"
private val StatusKey = "status"
private val FeaturesKey = "features"
+ private val V1MinVersionKey = "min_version_level"
+ private val V1MaxVersionKey = "max_version_level"
// V1 contains 'version', 'status' and 'features' keys.
val V1 = 1
- val CurrentVersion = V1
+ // V2 removes min_version_level
+ val V2 = 2
+
+ /**
+ * - Create a feature info with v1 json format if if the metadataVersion is before 3.2.0
+ * - Create a feature znode with v2 json format if the metadataVersion is 3.2.1 or above.
+ */
+ def apply(metadataVersion: MetadataVersion, status: FeatureZNodeStatus, features: Map[String, Short]): FeatureZNode = {
+ val version = if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
+ V2
+ } else {
+ V1
+ }
+ FeatureZNode(version, status, features)
+ }
def path = "/feature"
@@ -914,10 +931,19 @@ object FeatureZNode {
* @return JSON representation of the FeatureZNode, as an Array[Byte]
*/
def encode(featureZNode: FeatureZNode): Array[Byte] = {
+ val features = if (featureZNode.version == V1) {
+ asJavaMap(featureZNode.features.map{
+ case (feature, version) => feature -> Map(V1MaxVersionKey -> version, V1MinVersionKey -> version)
+ })
+ } else {
+ asJavaMap(featureZNode.features.map{
+ case (feature, version) => feature -> Map(V1MaxVersionKey -> version)
+ })
+ }
val jsonMap = collection.mutable.Map(
- VersionKey -> CurrentVersion,
+ VersionKey -> featureZNode.version,
StatusKey -> featureZNode.status.id,
- FeaturesKey -> featureZNode.features.toMap)
+ FeaturesKey -> features)
Json.encodeAsBytes(jsonMap.asJava)
}
@@ -935,27 +961,11 @@ object FeatureZNode {
case Right(js) =>
val featureInfo = js.asJsonObject
val version = featureInfo(VersionKey).to[Int]
- if (version < V1) {
+ if (version < V1 || version > V2) {
throw new IllegalArgumentException(s"Unsupported version: $version of feature information: " +
s"${new String(jsonBytes, UTF_8)}")
}
- val featuresMap = featureInfo
- .get(FeaturesKey)
- .flatMap(_.to[Option[Map[String, Map[String, Int]]]])
-
- if (featuresMap.isEmpty) {
- throw new IllegalArgumentException("Features map can not be absent in: " +
- s"${new String(jsonBytes, UTF_8)}")
- }
- val features = asJavaMap(
- featuresMap
- .map(theMap => theMap.map {
- case (featureName, versionInfo) => featureName -> versionInfo.map {
- case (label, version) => label -> version.asInstanceOf[Short]
- }
- }).getOrElse(Map[String, Map[String, Short]]()))
-
val statusInt = featureInfo
.get(StatusKey)
.flatMap(_.to[Option[Int]])
@@ -969,19 +979,44 @@ object FeatureZNode {
s"Malformed status: $statusInt found in feature information: ${new String(jsonBytes, UTF_8)}")
}
- var finalizedFeatures: Features[FinalizedVersionRange] = null
- try {
- finalizedFeatures = fromFinalizedFeaturesMap(features)
- } catch {
- case e: Exception => throw new IllegalArgumentException(
- "Unable to convert to finalized features from map: " + features, e)
- }
- FeatureZNode(status.get, finalizedFeatures)
+ val finalizedFeatures = decodeFeature(version, featureInfo, jsonBytes)
+ FeatureZNode(version, status.get, finalizedFeatures)
case Left(e) =>
throw new IllegalArgumentException(s"Failed to parse feature information: " +
s"${new String(jsonBytes, UTF_8)}", e)
}
}
+
+ private def decodeFeature(version: Int, featureInfo: JsonObject, jsonBytes: Array[Byte]): Map[String, Short] = {
+ val featuresMap = featureInfo
+ .get(FeaturesKey)
+ .flatMap(_.to[Option[Map[String, Map[String, Int]]]])
+
+ if (featuresMap.isEmpty) {
+ throw new IllegalArgumentException("Features map can not be absent in: " +
+ s"${new String(jsonBytes, UTF_8)}")
+ }
+ featuresMap.get.map {
+ case (featureName, versionInfo) =>
+ if (version == V1 && !versionInfo.contains(V1MinVersionKey)) {
+ throw new IllegalArgumentException(s"$V1MinVersionKey absent in [$versionInfo]")
+ }
+ if (!versionInfo.contains(V1MaxVersionKey)) {
+ throw new IllegalArgumentException(s"$V1MaxVersionKey absent in [$versionInfo]")
+ }
+
+ val minValueOpt = versionInfo.get(V1MinVersionKey)
+ val maxValue = versionInfo(V1MaxVersionKey)
+
+ if (version == V1 && (minValueOpt.get < 1 || maxValue < minValueOpt.get)) {
+ throw new IllegalArgumentException(s"Expected minValue >= 1, maxValue >= 1 and maxValue >= minValue, but received minValue: ${minValueOpt.get}, maxValue: $maxValue")
+ }
+ if (maxValue < 1) {
+ throw new IllegalArgumentException(s"Expected maxValue >= 1, but received maxValue: $maxValue")
+ }
+ featureName -> maxValue.toShort
+ }
+ }
}
object ZkData {
diff --git a/core/src/test/scala/kafka/zk/FeatureZNodeTest.scala b/core/src/test/scala/kafka/zk/FeatureZNodeTest.scala
index 9344724ff1..b7778c1d5f 100644
--- a/core/src/test/scala/kafka/zk/FeatureZNodeTest.scala
+++ b/core/src/test/scala/kafka/zk/FeatureZNodeTest.scala
@@ -17,31 +17,40 @@
package kafka.zk
-import java.nio.charset.StandardCharsets
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
-import org.apache.kafka.common.feature.Features._
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.apache.kafka.server.common.MetadataVersion.{IBP_3_2_IV0, IBP_3_3_IV0}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertThrows}
import org.junit.jupiter.api.Test
-import scala.jdk.CollectionConverters._
+import java.nio.charset.StandardCharsets
class FeatureZNodeTest {
@Test
def testEncodeDecode(): Unit = {
- val featureZNode = FeatureZNode(
+ val featureZNodeV1 = FeatureZNode(
+ IBP_3_2_IV0,
+ FeatureZNodeStatus.Enabled,
+
+ Map[String, Short](
+ "feature1" -> 2,
+ "feature2" -> 4))
+ val decodedV1 = FeatureZNode.decode(FeatureZNode.encode(featureZNodeV1))
+ assertEquals(featureZNodeV1, decodedV1)
+
+ val featureZNodeV2 = FeatureZNode(
+ IBP_3_3_IV0,
FeatureZNodeStatus.Enabled,
- Features.finalizedFeatures(
- Map[String, FinalizedVersionRange](
- "feature1" -> new FinalizedVersionRange(1, 2),
- "feature2" -> new FinalizedVersionRange(2, 4)).asJava))
- val decoded = FeatureZNode.decode(FeatureZNode.encode(featureZNode))
- assertEquals(featureZNode, decoded)
+
+ Map[String, Short](
+ "feature1" -> 2,
+ "feature2" -> 4))
+ val decodedV2 = FeatureZNode.decode(FeatureZNode.encode(featureZNodeV2))
+ assertEquals(featureZNodeV2, decodedV2)
}
@Test
- def testDecodeSuccess(): Unit = {
+ def testDecodeSuccessV1(): Unit = {
val featureZNodeStrTemplate = """{
"version":1,
"status":1,
@@ -52,15 +61,36 @@ class FeatureZNodeTest {
val node1 = FeatureZNode.decode(featureZNodeStrTemplate.format(validFeatures).getBytes(StandardCharsets.UTF_8))
assertEquals(FeatureZNodeStatus.Enabled, node1.status)
assertEquals(
- Features.finalizedFeatures(
- Map[String, FinalizedVersionRange](
- "feature1" -> new FinalizedVersionRange(1, 2),
- "feature2" -> new FinalizedVersionRange(2, 4)).asJava), node1.features)
+ Map[String, Short](
+ "feature1" -> 2,
+ "feature2" -> 4), node1.features)
+
+ val emptyFeatures = "{}"
+ val node2 = FeatureZNode.decode(featureZNodeStrTemplate.format(emptyFeatures).getBytes(StandardCharsets.UTF_8))
+ assertEquals(FeatureZNodeStatus.Enabled, node2.status)
+ assertEquals(Map.empty, node2.features)
+ }
+
+ @Test
+ def testDecodeSuccessV2(): Unit = {
+ val featureZNodeStrTemplate = """{
+ "version":2,
+ "status":1,
+ "features":%s
+ }"""
+
+ val validFeatures = """{"feature1": {"max_version_level": 2}, "feature2": {"max_version_level": 4}}"""
+ val node1 = FeatureZNode.decode(featureZNodeStrTemplate.format(validFeatures).getBytes(StandardCharsets.UTF_8))
+ assertEquals(FeatureZNodeStatus.Enabled, node1.status)
+ assertEquals(
+ Map[String, Short](
+ "feature1" -> 2,
+ "feature2" -> 4), node1.features)
val emptyFeatures = "{}"
val node2 = FeatureZNode.decode(featureZNodeStrTemplate.format(emptyFeatures).getBytes(StandardCharsets.UTF_8))
assertEquals(FeatureZNodeStatus.Enabled, node2.status)
- assertEquals(emptyFinalizedFeatures, node2.features)
+ assertEquals(Map.empty, node2.features)
}
@Test
@@ -73,11 +103,11 @@ class FeatureZNodeTest {
}"""
assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(FeatureZNode.V1 - 1, 1).getBytes(StandardCharsets.UTF_8)))
val invalidStatus = FeatureZNodeStatus.Enabled.id + 1
- assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(FeatureZNode.CurrentVersion, invalidStatus).getBytes(StandardCharsets.UTF_8)))
+ assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(FeatureZNode.V2, invalidStatus).getBytes(StandardCharsets.UTF_8)))
}
@Test
- def testDecodeFailOnInvalidFeatures(): Unit = {
+ def testDecodeFailOnInvalidFeaturesV1(): Unit = {
val featureZNodeStrTemplate =
"""{
"version":1,
@@ -99,4 +129,29 @@ class FeatureZNodeTest {
val invalidFeaturesMissingMinVersionLevel = ""","features":{"feature1": {"max_version_level": 1}}"""
assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(invalidFeaturesMissingMinVersionLevel).getBytes(StandardCharsets.UTF_8)))
}
+
+ @Test
+ def testDecodeFailOnInvalidFeaturesV2(): Unit = {
+ val featureZNodeStrTemplate =
+ """{
+ "version":2,
+ "status":1%s
+ }"""
+
+ val missingFeatures = ""
+ assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(missingFeatures).getBytes(StandardCharsets.UTF_8)))
+
+ val malformedFeatures = ""","features":{"feature1": {"min_version_level": 1, "max_version_level": 2}, "partial"}"""
+ assertThrows(classOf[IllegalArgumentException], () => FeatureZNode.decode(featureZNodeStrTemplate.format(malformedFeatures).getBytes(StandardCharsets.UTF_8)))
+
+ // We only inspect these configs in v1
+ val invalidFeaturesMinVersionLevel = ""","features":{"feature1": {"min_version_level": 0, "max_version_level": 2}}"""
+ assertDoesNotThrow(() => FeatureZNode.decode(featureZNodeStrTemplate.format(invalidFeaturesMinVersionLevel).getBytes(StandardCharsets.UTF_8)))
+
+ val invalidFeaturesMaxVersionLevel = ""","features":{"feature1": {"min_version_level": 2, "max_version_level": 1}}"""
+ assertDoesNotThrow(() => FeatureZNode.decode(featureZNodeStrTemplate.format(invalidFeaturesMaxVersionLevel).getBytes(StandardCharsets.UTF_8)))
+
+ val invalidFeaturesMissingMinVersionLevel = ""","features":{"feature1": {"max_version_level": 1}}"""
+ assertDoesNotThrow(() => FeatureZNode.decode(featureZNodeStrTemplate.format(invalidFeaturesMissingMinVersionLevel).getBytes(StandardCharsets.UTF_8)))
+ }
}
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index d53fc763eb..d8d90ede09 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -19,7 +19,6 @@ package kafka.controller
import java.util.Properties
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingQueue, TimeUnit}
-
import com.yammer.metrics.core.Timer
import kafka.api.LeaderAndIsr
import kafka.controller.KafkaController.AlterPartitionCallback
@@ -27,13 +26,12 @@ import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
import kafka.utils.{LogCaptureAppender, TestUtils}
import kafka.zk.{FeatureZNodeStatus, _}
import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException}
-import org.apache.kafka.common.feature.Features
import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.{IBP_2_6_IV0, IBP_2_7_IV0}
+import org.apache.kafka.server.common.MetadataVersion.{IBP_2_6_IV0, IBP_2_7_IV0, IBP_3_2_IV0}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.log4j.Level
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
@@ -637,12 +635,12 @@ class ControllerIntegrationTest extends QuorumTestHarness {
@Test
def testControllerFeatureZNodeSetupWhenFeatureVersioningIsEnabledWithDisabledExistingFeatureZNode(): Unit = {
- testControllerFeatureZNodeSetup(Some(new FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures())), IBP_2_7_IV0)
+ testControllerFeatureZNodeSetup(Some(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, Map.empty[String, Short])), IBP_2_7_IV0)
}
@Test
def testControllerFeatureZNodeSetupWhenFeatureVersioningIsEnabledWithEnabledExistingFeatureZNode(): Unit = {
- testControllerFeatureZNodeSetup(Some(new FeatureZNode(FeatureZNodeStatus.Enabled, Features.emptyFinalizedFeatures())), IBP_2_7_IV0)
+ testControllerFeatureZNodeSetup(Some(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, Map.empty[String, Short])), IBP_2_7_IV0)
}
@Test
@@ -652,12 +650,12 @@ class ControllerIntegrationTest extends QuorumTestHarness {
@Test
def testControllerFeatureZNodeSetupWhenFeatureVersioningIsDisabledWithDisabledExistingFeatureZNode(): Unit = {
- testControllerFeatureZNodeSetup(Some(new FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures())), IBP_2_6_IV0)
+ testControllerFeatureZNodeSetup(Some(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, Map.empty[String, Short])), IBP_2_6_IV0)
}
@Test
def testControllerFeatureZNodeSetupWhenFeatureVersioningIsDisabledWithEnabledExistingFeatureZNode(): Unit = {
- testControllerFeatureZNodeSetup(Some(new FeatureZNode(FeatureZNodeStatus.Enabled, Features.emptyFinalizedFeatures())), IBP_2_6_IV0)
+ testControllerFeatureZNodeSetup(Some(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, Map.empty[String, Short])), IBP_2_6_IV0)
}
@Test
@@ -812,7 +810,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
val (mayBeFeatureZNodeBytes, versionAfter) = zkClient.getDataAndVersion(FeatureZNode.path)
val newZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
if (interBrokerProtocolVersion.isAtLeast(IBP_2_7_IV0)) {
- val emptyZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, Features.emptyFinalizedFeatures)
+ val emptyZNode = FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, Map.empty[String, Short])
initialZNode match {
case Some(node) => {
node.status match {
@@ -826,10 +824,10 @@ class ControllerIntegrationTest extends QuorumTestHarness {
}
case None =>
assertEquals(0, versionAfter)
- assertEquals(new FeatureZNode(FeatureZNodeStatus.Enabled, Features.emptyFinalizedFeatures), newZNode)
+ assertEquals(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, Map.empty[String, Short]), newZNode)
}
} else {
- val emptyZNode = new FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures)
+ val emptyZNode = FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, Map.empty[String, Short])
initialZNode match {
case Some(node) => {
node.status match {
@@ -843,7 +841,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
}
case None =>
assertEquals(0, versionAfter)
- assertEquals(new FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures), newZNode)
+ assertEquals(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, Map.empty[String, Short]), newZNode)
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
index eab3928483..ad8786c919 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala
@@ -17,9 +17,10 @@
package kafka.server
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.{Disabled, Test}
+import org.junit.jupiter.api.Test
import scala.jdk.CollectionConverters._
@@ -38,15 +39,12 @@ class BrokerFeaturesTest {
"test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
brokerFeatures.setSupportedFeatures(supportedFeatures)
- val compatibleFeatures = Map[String, FinalizedVersionRange](
- "test_feature_1" -> new FinalizedVersionRange(2, 3))
- val inCompatibleFeatures = Map[String, FinalizedVersionRange](
- "test_feature_3" -> new FinalizedVersionRange(3, 4))
+ val compatibleFeatures = Map[String, Short]("test_feature_1" -> 4)
+ val inCompatibleFeatures = Map[String, Short]("test_feature_3" -> 4)
val features = compatibleFeatures++inCompatibleFeatures
- val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+ val finalizedFeatures = features
- assertEquals(
- Features.finalizedFeatures(inCompatibleFeatures.asJava),
+ assertEquals(inCompatibleFeatures,
brokerFeatures.incompatibleFeatures(finalizedFeatures))
assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures))
}
@@ -59,15 +57,13 @@ class BrokerFeaturesTest {
"test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
brokerFeatures.setSupportedFeatures(supportedFeatures)
- val compatibleFeatures = Map[String, FinalizedVersionRange](
- "test_feature_1" -> new FinalizedVersionRange(2, 3))
- val inCompatibleFeatures = Map[String, FinalizedVersionRange](
- "test_feature_2" -> new FinalizedVersionRange(1, 4))
+ val compatibleFeatures = Map[String, Short]("test_feature_1" -> 3)
+ val inCompatibleFeatures = Map[String, Short]("test_feature_2" -> 4)
val features = compatibleFeatures++inCompatibleFeatures
- val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+ val finalizedFeatures = features
assertEquals(
- Features.finalizedFeatures(inCompatibleFeatures.asJava),
+ inCompatibleFeatures,
brokerFeatures.incompatibleFeatures(finalizedFeatures))
assertTrue(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures))
}
@@ -80,16 +76,15 @@ class BrokerFeaturesTest {
"test_feature_2" -> new SupportedVersionRange(1, 3)).asJava)
brokerFeatures.setSupportedFeatures(supportedFeatures)
- val compatibleFeatures = Map[String, FinalizedVersionRange](
- "test_feature_1" -> new FinalizedVersionRange(2, 3),
- "test_feature_2" -> new FinalizedVersionRange(1, 3))
- val finalizedFeatures = Features.finalizedFeatures(compatibleFeatures.asJava)
- assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).empty())
+ val compatibleFeatures = Map[String, Short](
+ "test_feature_1" -> 3,
+ "test_feature_2" -> 3)
+ val finalizedFeatures = compatibleFeatures
+ assertTrue(brokerFeatures.incompatibleFeatures(finalizedFeatures).isEmpty)
assertFalse(BrokerFeatures.hasIncompatibleFeatures(supportedFeatures, finalizedFeatures))
}
@Test
- @Disabled("Need to remove or rewrite this test after we fully remove FinalizedVersionRange")
def testDefaultFinalizedFeatures(): Unit = {
val brokerFeatures = BrokerFeatures.createDefault()
val supportedFeatures = Features.supportedFeatures(Map[String, SupportedVersionRange](
@@ -98,10 +93,11 @@ class BrokerFeaturesTest {
"test_feature_3" -> new SupportedVersionRange(3, 7)).asJava)
brokerFeatures.setSupportedFeatures(supportedFeatures)
- val expectedFeatures = Map[String, FinalizedVersionRange](
- "test_feature_1" -> new FinalizedVersionRange(1, 4),
- "test_feature_2" -> new FinalizedVersionRange(1, 3),
- "test_feature_3" -> new FinalizedVersionRange(3, 7))
- assertEquals(Features.finalizedFeatures(expectedFeatures.asJava), brokerFeatures.defaultFinalizedFeatures)
+ val expectedFeatures = Map[String, Short](
+ MetadataVersion.FEATURE_NAME -> MetadataVersion.latest().featureLevel(),
+ "test_feature_1" -> 4,
+ "test_feature_2" -> 3,
+ "test_feature_3" -> 7)
+ assertEquals(expectedFeatures, brokerFeatures.defaultFinalizedFeatures)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
index d0f4c0ab05..2836fbf8da 100644
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala
@@ -17,7 +17,7 @@
package kafka.server
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
@@ -37,9 +37,7 @@ class FinalizedFeatureCacheTest {
val brokerFeatures = BrokerFeatures.createDefault()
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
- val features = Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(1, 4))
- val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+ val finalizedFeatures = Map[String, Short]("feature_1" -> 4)
val cache = new FinalizedFeatureCache(brokerFeatures)
cache.updateOrThrow(finalizedFeatures, 10)
@@ -62,9 +60,7 @@ class FinalizedFeatureCacheTest {
val brokerFeatures = BrokerFeatures.createDefault()
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
- val features = Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(1, 2))
- val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+ val finalizedFeatures = Map[String, Short]("feature_1" -> 2)
val cache = new FinalizedFeatureCache(brokerFeatures)
assertThrows(classOf[FeatureCacheUpdateException], () => cache.updateOrThrow(finalizedFeatures, 12))
@@ -80,9 +76,7 @@ class FinalizedFeatureCacheTest {
val brokerFeatures = BrokerFeatures.createDefault()
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
- val features = Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(2, 3))
- val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+ val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
val cache = new FinalizedFeatureCache(brokerFeatures)
cache.updateOrThrow(finalizedFeatures, 12)
@@ -98,9 +92,7 @@ class FinalizedFeatureCacheTest {
val brokerFeatures = BrokerFeatures.createDefault()
brokerFeatures.setSupportedFeatures(Features.supportedFeatures(supportedFeatures.asJava))
- val features = Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(2, 3))
- val finalizedFeatures = Features.finalizedFeatures(features.asJava)
+ val finalizedFeatures = Map[String, Short]("feature_1" -> 3)
val cache = new FinalizedFeatureCache(brokerFeatures)
cache.updateOrThrow(finalizedFeatures, 12)
diff --git a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
index d59474efd4..67986d1683 100644
--- a/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
@@ -17,17 +17,16 @@
package kafka.server
-import java.util.concurrent.{CountDownLatch, TimeoutException}
-
-import kafka.server.QuorumTestHarness
-import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.utils.Exit
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
+import org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0
import org.apache.kafka.test.{TestUtils => JTestUtils}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
+import java.util.concurrent.{CountDownLatch, TimeoutException}
import scala.jdk.CollectionConverters._
class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
@@ -42,14 +41,12 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
}
private def createFinalizedFeatures(): FinalizedFeaturesAndEpoch = {
- val finalizedFeaturesMap = Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(2, 3))
- val finalizedFeatures = Features.finalizedFeatures(finalizedFeaturesMap.asJava)
- zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, finalizedFeatures))
+ val finalizedFeaturesMap = Map[String, Short]("feature_1" -> 3)
+ zkClient.createFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, finalizedFeaturesMap))
val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path)
assertNotEquals(version, ZkVersion.UnknownVersion)
assertFalse(mayBeFeatureZNodeBytes.isEmpty)
- FinalizedFeaturesAndEpoch(finalizedFeatures, version)
+ FinalizedFeaturesAndEpoch(finalizedFeaturesMap, version)
}
private def createListener(
@@ -87,8 +84,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
val cache = new FinalizedFeatureCache(brokerFeatures)
val listener = createListener(cache, Some(initialFinalizedFeatures))
- def updateAndCheckCache(finalizedFeatures: Features[FinalizedVersionRange]): Unit = {
- zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, finalizedFeatures))
+ def updateAndCheckCache(finalizedFeatures: Map[String, Short]): Unit = {
+ zkClient.updateFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, finalizedFeatures))
val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
@@ -101,18 +98,14 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
// Check if the write succeeds and a ZK notification is received that causes the feature cache
// to be populated.
- updateAndCheckCache(
- Features.finalizedFeatures(
- Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(2, 4)).asJava))
+ updateAndCheckCache(Map[String, Short]("feature_1" -> 4))
// Check if second write succeeds and a ZK notification is again received that causes the cache
// to be populated. This check is needed to verify that the watch on the FeatureZNode was
// re-established after the notification was received due to the first write above.
updateAndCheckCache(
- Features.finalizedFeatures(
- Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(2, 4),
- "feature_2" -> new FinalizedVersionRange(1, 3)).asJava))
+ Map[String, Short](
+ "feature_1" -> 4,
+ "feature_2" -> 3))
}
/**
@@ -146,9 +139,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
val cache = new FinalizedFeatureCache(brokerFeatures)
val initialFinalizedFeatures = createFinalizedFeatures()
- val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange]()
- val updatedFinalizedFeatures = Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava)
- zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Disabled, updatedFinalizedFeatures))
+ val updatedFinalizedFeaturesMap = Map[String, Short]()
+ zkClient.updateFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, updatedFinalizedFeaturesMap))
val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
@@ -169,9 +161,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
assertThrows(classOf[TimeoutException], () => cache.waitUntilEpochOrThrow(initialFinalizedFeatures.epoch + 1, JTestUtils.DEFAULT_MAX_WAIT_MS))
- val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange]()
- val updatedFinalizedFeatures = Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava)
- zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Disabled, updatedFinalizedFeatures))
+ val updatedFinalizedFeaturesMap = Map[String, Short]()
+ zkClient.updateFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Disabled, updatedFinalizedFeaturesMap))
val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
@@ -191,10 +182,8 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
val brokerFeatures = createBrokerFeatures()
val cache = new FinalizedFeatureCache(brokerFeatures)
- val incompatibleFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(2, 5))
- val incompatibleFinalizedFeatures = Features.finalizedFeatures(incompatibleFinalizedFeaturesMap.asJava)
- zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, incompatibleFinalizedFeatures))
+ val incompatibleFinalizedFeaturesMap = Map[String, Short]("feature_1" -> 5)
+ zkClient.createFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, incompatibleFinalizedFeaturesMap))
val (mayBeFeatureZNodeBytes, initialVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
assertNotEquals(initialVersion, ZkVersion.UnknownVersion)
assertFalse(mayBeFeatureZNodeBytes.isEmpty)
@@ -240,12 +229,9 @@ class FinalizedFeatureChangeListenerTest extends QuorumTestHarness {
val exitLatch = new CountDownLatch(1)
Exit.setExitProcedure((_, _) => exitLatch.countDown())
- val incompatibleFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
- "feature_1" -> new FinalizedVersionRange(
- brokerFeatures.supportedFeatures.get("feature_1").min(),
- (brokerFeatures.supportedFeatures.get("feature_1").max() + 1).asInstanceOf[Short]))
- val incompatibleFinalizedFeatures = Features.finalizedFeatures(incompatibleFinalizedFeaturesMap.asJava)
- zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, incompatibleFinalizedFeatures))
+ val incompatibleFinalizedFeaturesMap = Map[String, Short](
+ "feature_1" -> (brokerFeatures.supportedFeatures.get("feature_1").max() + 1).asInstanceOf[Short])
+ zkClient.updateFeatureZNode(FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, incompatibleFinalizedFeaturesMap))
val (mayBeFeatureZNodeIncompatibleBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
assertFalse(mayBeFeatureZNodeIncompatibleBytes.isEmpty)
diff --git a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
index a84082b55e..9f63aa73e1 100644
--- a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
@@ -19,20 +19,18 @@ package kafka.server
import java.util.{Optional, Properties}
import java.util.concurrent.ExecutionException
-
import kafka.utils.TestUtils
import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
import org.apache.kafka.common.errors.InvalidRequestException
-import org.apache.kafka.common.feature.FinalizedVersionRange
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.message.UpdateFeaturesRequestData
import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{UpdateFeaturesRequest, UpdateFeaturesResponse}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV0
+import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV0, IBP_3_2_IV0}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertThrows, assertTrue}
@@ -52,8 +50,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3))))
}
- private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
- Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2))))
+ private def defaultFinalizedFeatures(): Map[String, Short] = {
+ Utils.mkMap(Utils.mkEntry("feature_1", 2.toShort)).asScala.toMap
}
private def updateSupportedFeatures(
@@ -84,9 +82,9 @@ class UpdateFeaturesTest extends BaseRequestTest {
updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
}
- private def updateFeatureZNode(features: Features[FinalizedVersionRange]): Int = {
+ private def updateFeatureZNode(features: Map[String, Short]): Int = {
val server = serverForId(0).get
- val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+ val newNode = FeatureZNode(IBP_2_7_IV0, FeatureZNodeStatus.Enabled, features)
val newVersion = server.zkClient.updateFeatureZNode(newNode)
servers.foreach(s => {
s.featureCache.waitUntilEpochOrThrow(newVersion, s.config.zkConnectionTimeoutMs)
@@ -100,11 +98,11 @@ class UpdateFeaturesTest extends BaseRequestTest {
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
}
- private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Features[FinalizedVersionRange] = {
- Features.finalizedFeatures(features.asScala.map {
+ private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Map[String, Short] = {
+ features.asScala.map {
case(name, versionRange) =>
- (name, new FinalizedVersionRange(versionRange.minVersionLevel(), versionRange.maxVersionLevel()))
- }.asJava)
+ (name, versionRange.maxVersionLevel())
+ }.toMap
}
private def supportedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.SupportedVersionRange]): Features[SupportedVersionRange] = {
@@ -116,7 +114,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
private def checkFeatures(client: Admin,
expectedNode: FeatureZNode,
- expectedFinalizedFeatures: Features[FinalizedVersionRange],
+ expectedFinalizedFeatures: Map[String, Short],
expectedFinalizedFeaturesEpoch: Long,
expectedSupportedFeatures: Features[SupportedVersionRange]): Unit = {
assertEquals(expectedNode, getFeatureZNode())
@@ -183,8 +181,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
val nodeBefore = getFeatureZNode()
val validUpdates = new FeatureUpdateKeyCollection()
- val validUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey();
- validUpdate.setFeature("feature_1");
+ val validUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey()
+ validUpdate.setFeature("feature_1")
validUpdate.setMaxVersionLevel(defaultSupportedFeatures().get("feature_1").max())
validUpdate.setAllowDowngrade(false)
validUpdates.add(validUpdate)
@@ -210,7 +208,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
*/
@Test
def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): Unit = {
- val targetMaxVersionLevel = (defaultFinalizedFeatures().get("feature_1").max() - 1).asInstanceOf[Short]
+ val targetMaxVersionLevel = (defaultFinalizedFeatures()("feature_1") - 1).asInstanceOf[Short]
testWithInvalidFeatureUpdate[InvalidRequestException](
"feature_1",
new FeatureUpdate(targetMaxVersionLevel, FeatureUpdate.UpgradeType.UPGRADE),
@@ -223,11 +221,11 @@ class UpdateFeaturesTest extends BaseRequestTest {
*/
@Test
def testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): Unit = {
- val targetMaxVersionLevel = (defaultFinalizedFeatures().get("feature_1").max() + 1).asInstanceOf[Short]
+ val targetMaxVersionLevel = (defaultFinalizedFeatures()("feature_1") + 1).asInstanceOf[Short]
testWithInvalidFeatureUpdate[InvalidRequestException](
"feature_1",
new FeatureUpdate(targetMaxVersionLevel, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
- ".*When the downgradeType is set to SAFE set in the request, the provided maxVersionLevel:3.*existing maxVersionLevel:2.*".r)
+ ".*When the downgradeType is set to SAFE in the request, the provided versionLevel:3.*existing versionLevel:2.*".r)
}
/**
@@ -245,14 +243,14 @@ class UpdateFeaturesTest extends BaseRequestTest {
val nodeBefore = getFeatureZNode()
val invalidUpdates
- = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
- val invalidUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey();
+ = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection()
+ val invalidUpdate = new UpdateFeaturesRequestData.FeatureUpdateKey()
invalidUpdate.setFeature("feature_1")
invalidUpdate.setMaxVersionLevel(0)
invalidUpdate.setAllowDowngrade(false)
- invalidUpdates.add(invalidUpdate);
+ invalidUpdates.add(invalidUpdate)
val requestData = new UpdateFeaturesRequestData()
- requestData.setFeatureUpdates(invalidUpdates);
+ requestData.setFeatureUpdates(invalidUpdates)
val response = connectAndReceive[UpdateFeaturesResponse](
new UpdateFeaturesRequest.Builder(new UpdateFeaturesRequestData().setFeatureUpdates(invalidUpdates)).build(),
@@ -264,7 +262,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode))
assertNotNull(result.errorMessage)
assertFalse(result.errorMessage.isEmpty)
- val exceptionMsgPattern = ".*Can not provide maxVersionLevel: 0 less than 1.*".r
+ val exceptionMsgPattern = ".*Can not provide versionLevel: 0 less than 1.*".r
assertTrue(exceptionMsgPattern.findFirstIn(result.errorMessage).isDefined, result.errorMessage)
checkFeatures(
adminClient,
@@ -292,7 +290,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
*/
@Test
def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = {
- val targetMaxVersionLevel = defaultFinalizedFeatures().get("feature_1").max()
+ val targetMaxVersionLevel = defaultFinalizedFeatures()("feature_1")
testWithInvalidFeatureUpdate[InvalidRequestException](
"feature_1",
new FeatureUpdate(targetMaxVersionLevel, FeatureUpdate.UpgradeType.UPGRADE),
@@ -302,7 +300,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
private def testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibility(
featureName: String,
supportedVersionRange: SupportedVersionRange,
- initialFinalizedVersionRange: Option[FinalizedVersionRange]
+ initialFinalizedVersionRange: Option[Short]
): Unit = {
TestUtils.waitUntilControllerElected(zkClient)
@@ -327,8 +325,8 @@ class UpdateFeaturesTest extends BaseRequestTest {
updateSupportedFeatures(supportedFeaturesWithVersionIncompatibility, brokersWithVersionIncompatibility)
val initialFinalizedFeatures = initialFinalizedVersionRange.map(
- versionRange => Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(featureName, versionRange)))
- ).getOrElse(Features.emptyFinalizedFeatures())
+ versionRange => Utils.mkMap(Utils.mkEntry(featureName, versionRange)).asScala.toMap
+ ).getOrElse(Map.empty[String, Short])
val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
val invalidUpdate = new FeatureUpdate(supportedVersionRange.max(), FeatureUpdate.UpgradeType.UPGRADE)
@@ -358,7 +356,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibility(
feature,
defaultSupportedFeatures().get(feature),
- Some(defaultFinalizedFeatures().get(feature)))
+ Some(defaultFinalizedFeatures()(feature)))
}
/**
@@ -389,14 +387,13 @@ class UpdateFeaturesTest extends BaseRequestTest {
Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))
updateSupportedFeaturesInAllBrokers(supportedFeatures)
- val versionBefore = updateFeatureZNode(Features.emptyFinalizedFeatures())
+ val versionBefore = updateFeatureZNode(Map.empty)
- val targetFinalizedFeatures = Features.finalizedFeatures(
- Utils.mkMap(
- Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
- Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
- val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), FeatureUpdate.UpgradeType.UPGRADE)
- val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), FeatureUpdate.UpgradeType.UPGRADE)
+ val targetFinalizedFeatures = Utils.mkMap(
+ Utils.mkEntry("feature_1", 3.toShort),
+ Utils.mkEntry("feature_2", 3.toShort)).asScala.toMap
+ val update1 = new FeatureUpdate(targetFinalizedFeatures("feature_1"), FeatureUpdate.UpgradeType.UPGRADE)
+ val update2 = new FeatureUpdate(targetFinalizedFeatures("feature_2"), FeatureUpdate.UpgradeType.UPGRADE)
val adminClient = createAdminClient()
adminClient.updateFeatures(
@@ -406,7 +403,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
checkFeatures(
adminClient,
- new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
+ FeatureZNode(IBP_2_7_IV0, FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
targetFinalizedFeatures,
versionBefore + 1,
supportedFeatures)
@@ -425,21 +422,19 @@ class UpdateFeaturesTest extends BaseRequestTest {
Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))
updateSupportedFeaturesInAllBrokers(supportedFeatures)
- val initialFinalizedFeatures = Features.finalizedFeatures(
- Utils.mkMap(
- Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
- Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
+ val initialFinalizedFeatures = Utils.mkMap(
+ Utils.mkEntry("feature_1", 2.toShort),
+ Utils.mkEntry("feature_2", 4.toShort)).asScala.toMap
val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
// Below we aim to do the following:
// - Valid upgrade of feature_1 maxVersionLevel from 2 to 3
// - Valid downgrade of feature_2 maxVersionLevel from 4 to 3
- val targetFinalizedFeatures = Features.finalizedFeatures(
- Utils.mkMap(
- Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
- Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
- val update1 = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), FeatureUpdate.UpgradeType.UPGRADE)
- val update2 = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
+ val targetFinalizedFeatures = Utils.mkMap(
+ Utils.mkEntry("feature_1", 3.toShort),
+ Utils.mkEntry("feature_2", 3.toShort)).asScala.toMap
+ val update1 = new FeatureUpdate(targetFinalizedFeatures("feature_1"), FeatureUpdate.UpgradeType.UPGRADE)
+ val update2 = new FeatureUpdate(targetFinalizedFeatures("feature_2"), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
val adminClient = createAdminClient()
adminClient.updateFeatures(
@@ -449,7 +444,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
checkFeatures(
adminClient,
- new FeatureZNode(FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
+ FeatureZNode(IBP_2_7_IV0, FeatureZNodeStatus.Enabled, targetFinalizedFeatures),
targetFinalizedFeatures,
versionBefore + 1,
supportedFeatures)
@@ -469,22 +464,20 @@ class UpdateFeaturesTest extends BaseRequestTest {
Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
Utils.mkEntry("feature_2", new SupportedVersionRange(2, 5))))
updateSupportedFeaturesInAllBrokers(supportedFeatures)
- val initialFinalizedFeatures = Features.finalizedFeatures(
- Utils.mkMap(
- Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
- Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
+ val initialFinalizedFeatures = Utils.mkMap(
+ Utils.mkEntry("feature_1", 2.toShort),
+ Utils.mkEntry("feature_2", 4.toShort)).asScala.toMap
val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
// Below we aim to do the following:
// - Valid upgrade of feature_1 maxVersionLevel from 2 to 3
// - Invalid downgrade of feature_2 maxVersionLevel from 4 to 3
// (because we intentionally do not set the allowDowngrade flag)
- val targetFinalizedFeatures = Features.finalizedFeatures(
- Utils.mkMap(
- Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
- Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
- val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), FeatureUpdate.UpgradeType.UPGRADE)
- val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), FeatureUpdate.UpgradeType.UPGRADE)
+ val targetFinalizedFeatures = Utils.mkMap(
+ Utils.mkEntry("feature_1", 3.toShort),
+ Utils.mkEntry("feature_2", 3.toShort)).asScala.toMap
+ val validUpdate = new FeatureUpdate(targetFinalizedFeatures("feature_1"), FeatureUpdate.UpgradeType.UPGRADE)
+ val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures("feature_2"), FeatureUpdate.UpgradeType.UPGRADE)
val adminClient = createAdminClient()
val result = adminClient.updateFeatures(
@@ -496,13 +489,12 @@ class UpdateFeaturesTest extends BaseRequestTest {
// Expect update for "feature_2" to have failed.
checkException[InvalidRequestException](
result, Map("feature_2" -> ".*Can not downgrade finalized feature.*".r))
- val expectedFeatures = Features.finalizedFeatures(
- Utils.mkMap(
- Utils.mkEntry("feature_1", targetFinalizedFeatures.get("feature_1")),
- Utils.mkEntry("feature_2", initialFinalizedFeatures.get("feature_2"))))
+ val expectedFeatures = Utils.mkMap(
+ Utils.mkEntry("feature_1", targetFinalizedFeatures("feature_1")),
+ Utils.mkEntry("feature_2", initialFinalizedFeatures("feature_2"))).asScala.toMap
checkFeatures(
adminClient,
- FeatureZNode(FeatureZNodeStatus.Enabled, expectedFeatures),
+ FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, expectedFeatures),
expectedFeatures,
versionBefore + 1,
supportedFeatures)
@@ -537,22 +529,20 @@ class UpdateFeaturesTest extends BaseRequestTest {
Utils.mkEntry("feature_2", supportedFeatures.get("feature_2"))))
updateSupportedFeatures(supportedFeaturesWithVersionIncompatibility, brokersWithVersionIncompatibility)
- val initialFinalizedFeatures = Features.finalizedFeatures(
- Utils.mkMap(
- Utils.mkEntry("feature_1", new FinalizedVersionRange(2, 2)),
- Utils.mkEntry("feature_2", new FinalizedVersionRange(4, 4))))
+ val initialFinalizedFeatures = Utils.mkMap(
+ Utils.mkEntry("feature_1", 2.toShort),
+ Utils.mkEntry("feature_2", 4.toShort)).asScala.toMap
val versionBefore = updateFeatureZNode(initialFinalizedFeatures)
// Below we aim to do the following:
// - Invalid upgrade of feature_1 maxVersionLevel from 2 to 3
// (because one of the brokers does not support the max version: 3)
// - Valid downgrade of feature_2 maxVersionLevel from 4 to 3
- val targetFinalizedFeatures = Features.finalizedFeatures(
- Utils.mkMap(
- Utils.mkEntry("feature_1", new FinalizedVersionRange(3, 3)),
- Utils.mkEntry("feature_2", new FinalizedVersionRange(3, 3))))
- val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_1").max(), FeatureUpdate.UpgradeType.UPGRADE)
- val validUpdate = new FeatureUpdate(targetFinalizedFeatures.get("feature_2").max(), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
+ val targetFinalizedFeatures = Utils.mkMap(
+ Utils.mkEntry("feature_1", 3.toShort),
+ Utils.mkEntry("feature_2", 3.toShort)).asScala.toMap
+ val invalidUpdate = new FeatureUpdate(targetFinalizedFeatures("feature_1"), FeatureUpdate.UpgradeType.UPGRADE)
+ val validUpdate = new FeatureUpdate(targetFinalizedFeatures("feature_2"), FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)
val adminClient = createAdminClient()
val result = adminClient.updateFeatures(
@@ -563,13 +553,12 @@ class UpdateFeaturesTest extends BaseRequestTest {
result.values().get("feature_2").get()
// Expect update for "feature_1" to have failed.
checkException[InvalidRequestException](result, Map("feature_1" -> ".*brokers.*incompatible.*".r))
- val expectedFeatures = Features.finalizedFeatures(
- Utils.mkMap(
- Utils.mkEntry("feature_1", initialFinalizedFeatures.get("feature_1")),
- Utils.mkEntry("feature_2", targetFinalizedFeatures.get("feature_2"))))
+ val expectedFeatures = Utils.mkMap(
+ Utils.mkEntry("feature_1", initialFinalizedFeatures("feature_1")),
+ Utils.mkEntry("feature_2", targetFinalizedFeatures("feature_2"))).asScala.toMap
checkFeatures(
adminClient,
- FeatureZNode(FeatureZNodeStatus.Enabled, expectedFeatures),
+ FeatureZNode(IBP_3_2_IV0, FeatureZNodeStatus.Enabled, expectedFeatures),
expectedFeatures,
versionBefore + 1,
supportedFeatures)
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 455e051541..7f3c12122e 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -152,7 +152,7 @@ public enum MetadataVersion {
// Support for leader recovery for unclean leader election (KIP-704)
IBP_3_2_IV0(4, "3.2", "IV0", false),
- // Support for metadata.version feature flag (KIP-778)
+ // Support for metadata.version feature flag and Removes min_version_level from the finalized version range that is written to ZooKeeper (KIP-778)
IBP_3_3_IV0(5, "3.3", "IV0", false);
public static final String FEATURE_NAME = "metadata.version";
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 8a825e3da2..bbce1b6ae4 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -59,6 +59,7 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -186,6 +187,9 @@ class MetadataVersionTest {
assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2-IV0"));
+
+ assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3"));
+ assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3-IV0"));
}
@Test
@@ -228,7 +232,7 @@ class MetadataVersionTest {
assertEquals("3.0", IBP_3_0_IV0.shortVersion());
assertEquals("3.0", IBP_3_0_IV1.shortVersion());
assertEquals("3.1", IBP_3_1_IV0.shortVersion());
- assertEquals("3.2", IBP_3_2_IV0.shortVersion());
+ assertEquals("3.3", IBP_3_3_IV0.shortVersion());
}
@Test
@@ -261,6 +265,7 @@ class MetadataVersionTest {
assertEquals("3.0-IV1", IBP_3_0_IV1.version());
assertEquals("3.1-IV0", IBP_3_1_IV0.version());
assertEquals("3.2-IV0", IBP_3_2_IV0.version());
+ assertEquals("3.3-IV0", IBP_3_3_IV0.version());
}
@Test