You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/06/01 23:09:54 UTC

[kafka] branch trunk updated: MINOR: Several fixes and improvements for FeatureControlManager (#12207)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0ca9cd4d2d MINOR: Several fixes and improvements for FeatureControlManager (#12207)
0ca9cd4d2d is described below

commit 0ca9cd4d2d2a89510dbe357783a316f2f0789799
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Wed Jun 1 16:09:38 2022 -0700

    MINOR: Several fixes and improvements for FeatureControlManager (#12207)
    
    This PR fixes a bug where FeatureControlManager#replay(FeatureLevelRecord) was throwing an
    exception if not all controllers in the quorum supported the feature being applied. While we do
    want to validate this, it needs to be validated earlier, before the record is committed to the log.
    Once the record has been committed to the log it should always be applied if the current controller
    supports it.
    
    Fix another bug where removing a feature was not supported once it had been configured. Note that
    because we reserve feature level 0 for "feature not enabled", we don't need to use
    Optional<VersionRange>; we can just return a range of 0-0 when the feature is not supported.
    
    Allow the metadata version to be downgraded when UpgradeType.UNSAFE_DOWNGRADE has been set.
    Previously we were unconditionally denying this even when this was set.
    
    Add a builder for FeatureControlManager, so that we can easily add new parameters to the
    constructor in the future. This will also be useful for creating FeatureControlManagers that are
    initialized to a specific MetadataVersion.
    
    Get rid of RemoveFeatureLevelRecord, since it's easier to just issue a FeatureLevelRecord with
    the level set to 0.
    
    Set metadata.max.idle.interval.ms to 0 in RaftClusterSnapshotTest for more predictability.
    
    Reviewers: David Arthur <mu...@gmail.com>, dengziming <de...@gmail.com>
---
 .../kafka/server/RaftClusterSnapshotTest.scala     |   8 +-
 .../kafka/controller/FeatureControlManager.java    | 134 ++++++++++----------
 .../apache/kafka/controller/QuorumController.java  |   6 +-
 .../apache/kafka/controller/QuorumFeatures.java    |  78 ++++++------
 .../java/org/apache/kafka/image/FeaturesDelta.java |  15 +--
 .../java/org/apache/kafka/image/MetadataDelta.java |   8 --
 .../common/metadata/RemoveFeatureLevelRecord.json  |  26 ----
 .../controller/FeatureControlManagerTest.java      | 139 ++++++++++++---------
 .../kafka/controller/QuorumFeaturesTest.java       | 108 +++++++---------
 .../org/apache/kafka/image/FeaturesImageTest.java  |  15 ++-
 10 files changed, 257 insertions(+), 280 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index e34a5ed6ed..503ce7d2be 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -21,6 +21,7 @@ import java.util.Collections
 import kafka.testkit.KafkaClusterTestKit
 import kafka.testkit.TestKitNodes
 import kafka.utils.TestUtils
+import kafka.server.KafkaConfig.{MetadataMaxIdleIntervalMsProp, MetadataSnapshotMaxNewRecordBytesProp}
 import org.apache.kafka.common.utils.BufferSupplier
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.snapshot.RecordsSnapshotReader
@@ -38,7 +39,6 @@ class RaftClusterSnapshotTest {
   def testSnapshotsGenerated(): Unit = {
     val numberOfBrokers = 3
     val numberOfControllers = 3
-    val metadataSnapshotMaxNewRecordBytes = 100
 
     TestUtils.resource(
       new KafkaClusterTestKit
@@ -48,10 +48,8 @@ class RaftClusterSnapshotTest {
             .setNumControllerNodes(numberOfControllers)
             .build()
         )
-        .setConfigProp(
-          KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp,
-          metadataSnapshotMaxNewRecordBytes.toString
-        )
+        .setConfigProp(MetadataSnapshotMaxNewRecordBytesProp, "10")
+        .setConfigProp(MetadataMaxIdleIntervalMsProp, "0")
         .build()
     ) { cluster =>
       cluster.format()
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 52d3c5d521..19127afa72 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -29,6 +29,7 @@ import java.util.Optional;
 import java.util.TreeMap;
 import java.util.function.Consumer;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.protocol.Errors;
@@ -47,6 +48,46 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
 
 
 public class FeatureControlManager {
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private QuorumFeatures quorumFeatures = null;
+        private MetadataVersion metadataVersion = MetadataVersion.UNINITIALIZED;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
+            this.quorumFeatures = quorumFeatures;
+            return this;
+        }
+
+        Builder setMetadataVersion(MetadataVersion metadataVersion) {
+            this.metadataVersion = metadataVersion;
+            return this;
+        }
+
+        public FeatureControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
+            if (quorumFeatures == null) {
+                quorumFeatures = new QuorumFeatures(0, new ApiVersions(), QuorumFeatures.defaultFeatureMap(),
+                        Collections.emptyList());
+            }
+            return new FeatureControlManager(logContext,
+                quorumFeatures,
+                snapshotRegistry,
+                metadataVersion);
+        }
+    }
+
     private final Logger log;
 
     /**
@@ -65,13 +106,16 @@ public class FeatureControlManager {
     private final TimelineObject<MetadataVersion> metadataVersion;
 
 
-    FeatureControlManager(LogContext logContext,
-                          QuorumFeatures quorumFeatures,
-                          SnapshotRegistry snapshotRegistry) {
+    private FeatureControlManager(
+        LogContext logContext,
+        QuorumFeatures quorumFeatures,
+        SnapshotRegistry snapshotRegistry,
+        MetadataVersion metadataVersion
+    ) {
         this.log = logContext.logger(FeatureControlManager.class);
         this.quorumFeatures = quorumFeatures;
         this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.metadataVersion = new TimelineObject<>(snapshotRegistry, MetadataVersion.UNINITIALIZED);
+        this.metadataVersion = new TimelineObject<>(snapshotRegistry, metadataVersion);
     }
 
     ControllerResult<Map<String, ApiError>> updateFeatures(
@@ -94,35 +138,6 @@ public class FeatureControlManager {
         }
     }
 
-    ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short initVersion) {
-        if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
-            return ControllerResult.atomicOf(
-                Collections.emptyList(),
-                Collections.singletonMap(
-                    MetadataVersion.FEATURE_NAME,
-                    new ApiError(Errors.INVALID_UPDATE_VERSION,
-                        "Cannot initialize metadata.version to " + initVersion + " since it has already been " +
-                            "initialized to " + metadataVersion().featureLevel() + ".")
-            ));
-        }
-        List<ApiMessageAndVersion> records = new ArrayList<>();
-        ApiError result = updateMetadataVersion(initVersion, false, records::add);
-        return ControllerResult.atomicOf(records, Collections.singletonMap(MetadataVersion.FEATURE_NAME, result));
-    }
-
-    /**
-     * Test if the quorum can support this feature and version
-     */
-    boolean canSupportVersion(String featureName, short version) {
-        return quorumFeatures.quorumSupportedFeature(featureName)
-            .filter(versionRange -> versionRange.contains(version))
-            .isPresent();
-    }
-
-    boolean featureExists(String featureName) {
-        return quorumFeatures.localSupportedFeature(featureName).isPresent();
-    }
-
     MetadataVersion metadataVersion() {
         return metadataVersion.get();
     }
@@ -134,11 +149,6 @@ public class FeatureControlManager {
         Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
         List<ApiMessageAndVersion> records
     ) {
-        if (!featureExists(featureName)) {
-            return invalidUpdateVersion(featureName, newVersion,
-                "The controller does not support the given feature.");
-        }
-
         if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
             return invalidUpdateVersion(featureName, newVersion,
                 "The controller does not support the given upgrade type.");
@@ -151,14 +161,14 @@ public class FeatureControlManager {
             currentVersion = finalizedVersions.get(featureName);
         }
 
-        if (newVersion <= 0) {
+        if (newVersion < 0) {
             return invalidUpdateVersion(featureName, newVersion,
-                "A feature version cannot be less than 1.");
+                "A feature version cannot be less than 0.");
         }
 
-        if (!canSupportVersion(featureName, newVersion)) {
-            return invalidUpdateVersion(featureName, newVersion,
-                "The quorum does not support the given feature version.");
+        Optional<String> reasonNotSupported = quorumFeatures.reasonNotSupported(featureName, newVersion);
+        if (reasonNotSupported.isPresent()) {
+            return invalidUpdateVersion(featureName, newVersion, reasonNotSupported.get());
         }
 
         for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) {
@@ -208,19 +218,6 @@ public class FeatureControlManager {
         boolean allowUnsafeDowngrade,
         Consumer<ApiMessageAndVersion> recordConsumer
     ) {
-        Optional<VersionRange> quorumSupported = quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
-        if (!quorumSupported.isPresent()) {
-            return invalidMetadataVersion(newVersionLevel, "The quorum does not support metadata.version.");
-        }
-
-        if (newVersionLevel <= 0) {
-            return invalidMetadataVersion(newVersionLevel, "KRaft mode/the quorum does not support metadata.version values less than 1.");
-        }
-
-        if (!quorumSupported.get().contains(newVersionLevel)) {
-            return invalidMetadataVersion(newVersionLevel, "The controller quorum does support this version.");
-        }
-
         MetadataVersion currentVersion = metadataVersion();
         final MetadataVersion newVersion;
         try {
@@ -234,9 +231,15 @@ public class FeatureControlManager {
             boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
             if (!metadataChanged) {
                 log.info("Downgrading metadata.version from {} to {}.", currentVersion, newVersion);
+            } else if (allowUnsafeDowngrade) {
+                log.info("Downgrading metadata.version unsafely from {} to {}.", currentVersion, newVersion);
             } else {
-                return invalidMetadataVersion(newVersionLevel, "Unsafe metadata.version downgrades are not supported.");
+                return invalidMetadataVersion(newVersionLevel, "Refusing to perform the requested " +
+                        "downgrade because it might delete metadata information. Retry using " +
+                        "UNSAFE_DOWNGRADE if you want to force the downgrade to proceed.");
             }
+        } else {
+            log.info("Upgrading metadata.version from {} to {}.", currentVersion, newVersion);
         }
 
         recordConsumer.accept(new ApiMessageAndVersion(
@@ -264,17 +267,22 @@ public class FeatureControlManager {
     }
 
     public void replay(FeatureLevelRecord record) {
-        if (!canSupportVersion(record.name(), record.featureLevel())) {
-            throw new RuntimeException("Controller cannot support feature " + record.name() +
-                                       " at version " + record.featureLevel());
+        VersionRange range = quorumFeatures.localSupportedFeature(record.name());
+        if (!range.contains(record.featureLevel())) {
+            throw new RuntimeException("Tried to apply FeatureLevelRecord " + record + ", but this controller only " +
+                "supports versions " + range);
         }
-
         if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
             log.info("Setting metadata.version to {}", record.featureLevel());
             metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
         } else {
-            log.info("Setting feature {} to {}", record.name(), record.featureLevel());
-            finalizedVersions.put(record.name(), record.featureLevel());
+            if (record.featureLevel() == 0) {
+                log.info("Removing feature {}", record.name());
+                finalizedVersions.remove(record.name());
+            } else {
+                log.info("Setting feature {} to {}", record.name(), record.featureLevel());
+                finalizedVersions.put(record.name(), record.featureLevel());
+            }
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7ca557dada..97a2040a38 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1563,7 +1563,11 @@ public final class QuorumController implements Controller {
             setReplicaPlacer(replicaPlacer).
             setControllerMetrics(controllerMetrics).
             build();
-        this.featureControl = new FeatureControlManager(logContext, quorumFeatures, snapshotRegistry);
+        this.featureControl = new FeatureControlManager.Builder().
+                setLogContext(logContext).
+                setQuorumFeatures(quorumFeatures).
+                setSnapshotRegistry(snapshotRegistry).
+                build();
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
         this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
         this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 80a5c0a0b1..9b723515bd 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -33,40 +33,41 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.OptionalInt;
 import java.util.stream.Collectors;
 
 /**
  * A holder class of the local node's supported feature flags as well as the ApiVersions of other nodes.
  */
 public class QuorumFeatures {
+    private static final VersionRange DISABLED = VersionRange.of(0, 0);
+
     private static final Logger log = LoggerFactory.getLogger(QuorumFeatures.class);
 
     private final int nodeId;
     private final ApiVersions apiVersions;
-    private final Map<String, VersionRange> supportedFeatures;
+    private final Map<String, VersionRange> localSupportedFeatures;
     private final List<Integer> quorumNodeIds;
 
     QuorumFeatures(
         int nodeId,
         ApiVersions apiVersions,
-        Map<String, VersionRange> supportedFeatures,
+        Map<String, VersionRange> localSupportedFeatures,
         List<Integer> quorumNodeIds
     ) {
         this.nodeId = nodeId;
         this.apiVersions = apiVersions;
-        this.supportedFeatures = Collections.unmodifiableMap(supportedFeatures);
+        this.localSupportedFeatures = Collections.unmodifiableMap(localSupportedFeatures);
         this.quorumNodeIds = Collections.unmodifiableList(quorumNodeIds);
     }
 
     public static QuorumFeatures create(
         int nodeId,
         ApiVersions apiVersions,
-        Map<String, VersionRange> supportedFeatures,
+        Map<String, VersionRange> localSupportedFeatures,
         Collection<Node> quorumNodes
     ) {
         List<Integer> nodeIds = quorumNodes.stream().map(Node::id).collect(Collectors.toList());
-        return new QuorumFeatures(nodeId, apiVersions, supportedFeatures, nodeIds);
+        return new QuorumFeatures(nodeId, apiVersions, localSupportedFeatures, nodeIds);
     }
 
     public static Map<String, VersionRange> defaultFeatureMap() {
@@ -75,47 +76,50 @@ public class QuorumFeatures {
         return features;
     }
 
-    Optional<VersionRange> quorumSupportedFeature(String featureName) {
-        List<VersionRange> supportedVersions = new ArrayList<>(quorumNodeIds.size());
-        for (int nodeId : quorumNodeIds) {
-            if (nodeId == this.nodeId) {
-                // We get this node's features from "supportedFeatures"
-                continue;
+    /**
+     * Return the reason a specific feature level is not supported, or Optional.empty if it is supported.
+     *
+     * @param featureName   The feature name.
+     * @param level         The feature level.
+     * @return              The reason why the feature level is not supported, or Optional.empty if it is supported.
+     */
+    public Optional<String> reasonNotSupported(String featureName, short level) {
+        VersionRange localRange = localSupportedFeatures.getOrDefault(featureName, DISABLED);
+        if (!localRange.contains(level)) {
+            if (localRange.equals(DISABLED)) {
+                return Optional.of("Local controller " + nodeId + " does not support this feature.");
+            } else {
+                return Optional.of("Local controller " + nodeId + " only supports versions " + localRange);
             }
-            NodeApiVersions nodeVersions = apiVersions.get(Integer.toString(nodeId));
+        }
+        List<String> missing = new ArrayList<>();
+        for (int id : quorumNodeIds) {
+            if (nodeId == id) {
+                continue; // We get the local node's features from localSupportedFeatures.
+            }
+            NodeApiVersions nodeVersions = apiVersions.get(Integer.toString(id));
             if (nodeVersions == null) {
+                missing.add(Integer.toString(id));
                 continue;
             }
             SupportedVersionRange supportedRange = nodeVersions.supportedFeatures().get(featureName);
-            if (supportedRange == null) {
-                supportedVersions.add(VersionRange.of(0, 0));
-            } else {
-                supportedVersions.add(VersionRange.of(supportedRange.min(), supportedRange.max()));
-            }
-        }
-        localSupportedFeature(featureName).ifPresent(supportedVersions::add);
-
-        if (supportedVersions.isEmpty()) {
-            return Optional.empty();
-        } else {
-            OptionalInt highestMinVersion = supportedVersions.stream().mapToInt(VersionRange::min).max();
-            OptionalInt lowestMaxVersion = supportedVersions.stream().mapToInt(VersionRange::max).min();
-            if (highestMinVersion.isPresent() && lowestMaxVersion.isPresent()) {
-                if (highestMinVersion.getAsInt() <= lowestMaxVersion.getAsInt()) {
-                    if (supportedVersions.size() < quorumNodeIds.size()) {
-                        log.info("Using incomplete set of quorum supported features.");
-                    }
-                    return Optional.of(VersionRange.of((short) highestMinVersion.getAsInt(), (short) lowestMaxVersion.getAsInt()));
+            VersionRange range = supportedRange == null ? DISABLED :
+                    VersionRange.of(supportedRange.min(), supportedRange.max());
+            if (!range.contains(level)) {
+                if (range.equals(DISABLED)) {
+                    return Optional.of("Controller " + id + " does not support this feature.");
                 } else {
-                    return Optional.empty();
+                    return Optional.of("Controller " + id + " only supports versions " + range);
                 }
-            } else {
-                return Optional.empty();
             }
         }
+        if (!missing.isEmpty()) {
+            log.info("Unable to get feature level information for controller(s): " + String.join(", ", missing));
+        }
+        return Optional.empty();
     }
 
-    Optional<VersionRange> localSupportedFeature(String featureName) {
-        return Optional.ofNullable(supportedFeatures.get(featureName));
+    VersionRange localSupportedFeature(String featureName) {
+        return localSupportedFeatures.getOrDefault(featureName, DISABLED);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 28eb187bfb..7f431c2d06 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
@@ -61,15 +60,11 @@ public final class FeaturesDelta {
         if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
             metadataVersionChange = MetadataVersion.fromFeatureLevel(record.featureLevel());
         } else {
-            changes.put(record.name(), Optional.of(record.featureLevel()));
-        }
-    }
-
-    public void replay(RemoveFeatureLevelRecord record) {
-        if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
-            metadataVersionChange = null;
-        } else {
-            changes.put(record.name(), Optional.empty());
+            if (record.featureLevel() == 0) {
+                changes.put(record.name(), Optional.empty());
+            } else {
+                changes.put(record.name(), Optional.of(record.featureLevel()));
+            }
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 01455e360c..25e141ea0d 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
-import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
@@ -204,9 +203,6 @@ public final class MetadataDelta {
             case PRODUCER_IDS_RECORD:
                 replay((ProducerIdsRecord) record);
                 break;
-            case REMOVE_FEATURE_LEVEL_RECORD:
-                replay((RemoveFeatureLevelRecord) record);
-                break;
             case BROKER_REGISTRATION_CHANGE_RECORD:
                 replay((BrokerRegistrationChangeRecord) record);
                 break;
@@ -291,10 +287,6 @@ public final class MetadataDelta {
         getOrCreateProducerIdsDelta().replay(record);
     }
 
-    public void replay(RemoveFeatureLevelRecord record) {
-        getOrCreateFeaturesDelta().replay(record);
-    }
-
     public void replay(AccessControlEntryRecord record) {
         getOrCreateAclsDelta().replay(record);
     }
diff --git a/metadata/src/main/resources/common/metadata/RemoveFeatureLevelRecord.json b/metadata/src/main/resources/common/metadata/RemoveFeatureLevelRecord.json
deleted file mode 100644
index 6ed716192e..0000000000
--- a/metadata/src/main/resources/common/metadata/RemoveFeatureLevelRecord.json
+++ /dev/null
@@ -1,26 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-{
-  "apiKey": 16,
-  "type": "metadata",
-  "name": "RemoveFeatureLevelRecord",
-  "validVersions": "0",
-  "flexibleVersions": "0+",
-  "fields": [
-    { "name": "Name", "type": "string", "versions": "0+",
-      "about": "The feature name." }
-  ]
-}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 02b1493548..3a8c4042df 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.admin.FeatureUpdate;
@@ -39,6 +40,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import static java.util.Collections.emptyList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 
@@ -70,7 +72,7 @@ public class FeatureControlManagerTest {
     public static QuorumFeatures features(Object... args) {
         Map<String, VersionRange> features = QuorumFeatures.defaultFeatureMap();
         features.putAll(rangeMap(args));
-        return new QuorumFeatures(0, new ApiVersions(), features, Collections.emptyList());
+        return new QuorumFeatures(0, new ApiVersions(), features, emptyList());
     }
 
     private static Map<String, Short> updateMap(Object... args) {
@@ -85,16 +87,17 @@ public class FeatureControlManagerTest {
 
     @Test
     public void testUpdateFeatures() {
-        LogContext logContext = new LogContext();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setQuorumFeatures(features("foo", 1, 2)).
+            setSnapshotRegistry(snapshotRegistry).
+            build();
         snapshotRegistry.getOrCreateSnapshot(-1);
-        FeatureControlManager manager = new FeatureControlManager(logContext,
-            features("foo", 1, 2), snapshotRegistry);
         assertEquals(new FinalizedControllerFeatures(Collections.emptyMap(), -1),
             manager.finalizedFeatures(-1));
-        assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
+        assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Invalid update version 3 for feature foo. The quorum does not support the given feature version."))),
+                    "Invalid update version 3 for feature foo. Local controller 0 only supports versions 1-2"))),
             manager.updateFeatures(updateMap("foo", 3),
                 Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
                 Collections.emptyMap(), false));
@@ -104,7 +107,7 @@ public class FeatureControlManagerTest {
         Map<String, ApiError> expectedMap = new HashMap<>();
         expectedMap.put("foo", ApiError.NONE);
         expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "Invalid update version 1 for feature bar. The controller does not support the given feature."));
+                "Invalid update version 1 for feature bar. Local controller 0 does not support this feature."));
         assertEquals(expectedMap, result.response());
         List<ApiMessageAndVersion> expectedMessages = new ArrayList<>();
         expectedMessages.add(new ApiMessageAndVersion(new FeatureLevelRecord().
@@ -121,8 +124,11 @@ public class FeatureControlManagerTest {
             setName("foo").setFeatureLevel((short) 2);
 
         snapshotRegistry.getOrCreateSnapshot(-1);
-        FeatureControlManager manager = new FeatureControlManager(logContext,
-            features("foo", 1, 2), snapshotRegistry);
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+                setLogContext(logContext).
+                setQuorumFeatures(features("foo", 1, 2)).
+                setSnapshotRegistry(snapshotRegistry).
+                build();
         manager.replay(record);
         snapshotRegistry.getOrCreateSnapshot(123);
         assertEquals(new FinalizedControllerFeatures(versionMap("foo", 2), 123),
@@ -133,12 +139,15 @@ public class FeatureControlManagerTest {
     public void testUpdateFeaturesErrorCases() {
         LogContext logContext = new LogContext();
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        FeatureControlManager manager = new FeatureControlManager(logContext,
-            features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setLogContext(logContext).
+            setQuorumFeatures(features("foo", 1, 5, "bar", 1, 2)).
+            setSnapshotRegistry(snapshotRegistry).
+            build();
 
         assertEquals(
             ControllerResult.atomicOf(
-                Collections.emptyList(),
+                emptyList(),
                 Collections.singletonMap(
                     "foo",
                     new ApiError(
@@ -160,7 +169,7 @@ public class FeatureControlManagerTest {
         manager.replay((FeatureLevelRecord) result.records().get(0).message());
         snapshotRegistry.getOrCreateSnapshot(3);
 
-        assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
+        assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "Invalid update version 2 for feature foo. Can't downgrade the version of this feature " +
                     "without setting the upgrade type to either safe or unsafe downgrade."))),
@@ -191,8 +200,11 @@ public class FeatureControlManagerTest {
     public void testFeatureControlIterator() throws Exception {
         LogContext logContext = new LogContext();
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        FeatureControlManager manager = new FeatureControlManager(logContext,
-            features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setLogContext(logContext).
+            setQuorumFeatures(features("foo", 1, 5, "bar", 1, 2)).
+            setSnapshotRegistry(snapshotRegistry).
+            build();
         ControllerResult<Map<String, ApiError>> result = manager.
             updateFeatures(updateMap("foo", 5, "bar", 1),
                 Collections.emptyMap(), Collections.emptyMap(), false);
@@ -208,57 +220,28 @@ public class FeatureControlManagerTest {
     }
 
     @Test
-    public void testInitializeMetadataVersion() {
-        // Default QuorumFeatures
-        checkMetadataVersion(features(), MetadataVersion.IBP_3_0_IV0, Errors.NONE);
-        checkMetadataVersion(features(), MetadataVersion.latest(), Errors.NONE);
-        checkMetadataVersion(features(), MetadataVersion.UNINITIALIZED, Errors.INVALID_UPDATE_VERSION);
-        checkMetadataVersion(features(), MetadataVersion.IBP_2_7_IV1, Errors.INVALID_UPDATE_VERSION);
-
-        // Increased QuorumFeatures
-        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
-        checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0, Errors.INVALID_UPDATE_VERSION);
-
-        // Empty QuorumFeatures
-        features = new QuorumFeatures(0, new ApiVersions(), Collections.emptyMap(), Collections.emptyList());
-        checkMetadataVersion(features, MetadataVersion.latest(), Errors.INVALID_UPDATE_VERSION);
-        checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0, Errors.INVALID_UPDATE_VERSION);
-    }
-
-    @Test
-    public void reInitializeMetadataVersion() {
-        LogContext logContext = new LogContext();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        FeatureControlManager manager = new FeatureControlManager(logContext, features(), snapshotRegistry);
-        ControllerResult<Map<String, ApiError>> result = manager.initializeMetadataVersion(MetadataVersion.IBP_3_0_IV0.featureLevel());
-        Errors actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
-        assertEquals(Errors.NONE, actual);
-        RecordTestUtils.replayAll(manager, result.records());
-
-        result = manager.initializeMetadataVersion(MetadataVersion.latest().featureLevel());
-        actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
-        assertEquals(Errors.INVALID_UPDATE_VERSION, actual);
-    }
-
-    public void checkMetadataVersion(QuorumFeatures features, MetadataVersion version, Errors expected) {
-        LogContext logContext = new LogContext();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        FeatureControlManager manager = new FeatureControlManager(logContext, features, snapshotRegistry);
-        ControllerResult<Map<String, ApiError>> result = manager.initializeMetadataVersion(version.featureLevel());
-        Errors actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
-        assertEquals(expected, actual);
+    public void testApplyMetadataVersionChangeRecord() {
+        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
+                MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setQuorumFeatures(features).build();
+        manager.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.IBP_3_0_IV0.featureLevel()));
+        assertEquals(MetadataVersion.IBP_3_0_IV0, manager.metadataVersion());
     }
 
     @Test
     public void testDowngradeMetadataVersion() {
-        LogContext logContext = new LogContext();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
-        FeatureControlManager manager = new FeatureControlManager(logContext, features, snapshotRegistry);
-        ControllerResult<Map<String, ApiError>> result = manager.initializeMetadataVersion(MetadataVersion.IBP_3_3_IV0.featureLevel());
-        RecordTestUtils.replayAll(manager, result.records());
+        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
+                MetadataVersion.IBP_3_2_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setQuorumFeatures(features).
+            setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
+            build();
         assertEquals(manager.metadataVersion(), MetadataVersion.IBP_3_3_IV0);
 
+        ControllerResult<Map<String, ApiError>> result;
         result = manager.updateFeatures(
             Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
             Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
@@ -280,7 +263,39 @@ public class FeatureControlManagerTest {
                 Collections.emptyMap(),
                 true);
         assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
-        assertEquals("Invalid update version 1 for feature metadata.version. The quorum does not support the given feature version.",
+        assertEquals("Invalid update version 1 for feature metadata.version. Local controller 0 only supports versions 4-5",
             result.response().get(MetadataVersion.FEATURE_NAME).message());
     }
+
+    @Test
+    public void testCreateFeatureLevelRecords() {
+        Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
+        localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
+                MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()));
+        localSupportedFeatures.put("foo", VersionRange.of(0, 2));
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+                setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(), localSupportedFeatures, emptyList())).
+                build();
+        ControllerResult<Map<String, ApiError>> result  = manager.updateFeatures(
+                Collections.singletonMap("foo", (short) 1),
+                Collections.singletonMap("foo", FeatureUpdate.UpgradeType.UPGRADE),
+                Collections.singletonMap(1, Collections.singletonMap("foo", VersionRange.of(0, 3))),
+                false);
+        assertEquals(ControllerResult.atomicOf(Arrays.asList(new ApiMessageAndVersion(
+                new FeatureLevelRecord().setName("foo").setFeatureLevel((short) 1), (short) 0)),
+                        Collections.singletonMap("foo", ApiError.NONE)), result);
+        RecordTestUtils.replayAll(manager, result.records());
+        assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
+
+        ControllerResult<Map<String, ApiError>> result2  = manager.updateFeatures(
+                Collections.singletonMap("foo", (short) 0),
+                Collections.singletonMap("foo", FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+                Collections.singletonMap(1, Collections.singletonMap("foo", VersionRange.of(0, 3))),
+                false);
+        assertEquals(ControllerResult.atomicOf(Arrays.asList(new ApiMessageAndVersion(
+                        new FeatureLevelRecord().setName("foo").setFeatureLevel((short) 0), (short) 0)),
+                Collections.singletonMap("foo", ApiError.NONE)), result2);
+        RecordTestUtils.replayAll(manager, result2.records());
+        assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
index 0194cd674e..7d8ba5bfec 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
@@ -19,86 +19,74 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
 import org.apache.kafka.metadata.VersionRange;
 import org.junit.jupiter.api.Test;
 
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 
+import static java.util.Collections.emptyMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class QuorumFeaturesTest {
-    @Test
-    public void testQuorumFeatures() {
-        ApiVersions apiVersions = new ApiVersions();
-        Map<String, VersionRange> featureMap = new HashMap<>(2);
-        featureMap.put("foo", VersionRange.of(1, 2));
-        featureMap.put("bar", VersionRange.of(3, 5));
-
-        List<Integer> nodeIds = new ArrayList<>();
-        nodeIds.add(0);
-
-        QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, featureMap, nodeIds);
-        assertLocalFeature(quorumFeatures, "foo", 1, 2);
-        assertLocalFeature(quorumFeatures, "bar", 3, 5);
-        assertQuorumFeature(quorumFeatures, "foo", 1, 2);
-        assertQuorumFeature(quorumFeatures, "bar", 3, 5);
+    private final static Map<String, VersionRange> LOCAL;
 
-        // Add a second node with identical features
-        nodeIds.add(1);
-        apiVersions.update("1", nodeApiVersions(featureMap));
-        assertLocalFeature(quorumFeatures, "foo", 1, 2);
-        assertLocalFeature(quorumFeatures, "bar", 3, 5);
-        assertQuorumFeature(quorumFeatures, "foo", 1, 2);
-        assertQuorumFeature(quorumFeatures, "bar", 3, 5);
-
-        // Change the supported features of one node
-        Map<String, VersionRange> node1Features = new HashMap<>(featureMap);
-        node1Features.put("bar", VersionRange.of(3, 4));
-        apiVersions.update("1", nodeApiVersions(node1Features));
-        assertLocalFeature(quorumFeatures, "foo", 1, 2);
-        assertLocalFeature(quorumFeatures, "bar", 3, 5);
-        assertQuorumFeature(quorumFeatures, "foo", 1, 2);
-        assertQuorumFeature(quorumFeatures, "bar", 3, 4);
-
-        // Add a third node with no features
-        nodeIds.add(2);
-        apiVersions.update("1", NodeApiVersions.create());
-        assertFalse(quorumFeatures.quorumSupportedFeature("foo").isPresent());
-        assertFalse(quorumFeatures.quorumSupportedFeature("bar").isPresent());
+    static {
+        Map<String, VersionRange> local = new HashMap<>();
+        local.put("foo", VersionRange.of(0, 3));
+        local.put("bar", VersionRange.of(0, 4));
+        local.put("baz", VersionRange.of(2, 2));
+        LOCAL = Collections.unmodifiableMap(local);
     }
 
-
-    public static NodeApiVersions nodeApiVersions(Map<String, VersionRange> featureMap) {
-        List<ApiVersionsResponseData.SupportedFeatureKey> supportedFeatures = new ArrayList<>(featureMap.size());
-        featureMap.forEach((featureName, versionRange) -> {
-            supportedFeatures.add(new ApiVersionsResponseData.SupportedFeatureKey()
-                .setName(featureName)
-                .setMinVersion(versionRange.min())
-                .setMaxVersion(versionRange.max()));
-        });
-        return new NodeApiVersions(Collections.emptyList(), supportedFeatures);
+    @Test
+    public void testDefaultSupportedLevels() {
+        QuorumFeatures quorumFeatures = new QuorumFeatures(0, new ApiVersions(), emptyMap(), Arrays.asList(0, 1, 2));
+        assertEquals(Optional.empty(), quorumFeatures.reasonNotSupported("foo", (short) 0));
+        assertEquals(Optional.of("Local controller 0 does not support this feature."),
+            quorumFeatures.reasonNotSupported("foo", (short) 1));
     }
 
-    private void assertLocalFeature(QuorumFeatures features, String name, int expectedMin, int expectedMax) {
-        Optional<VersionRange> featureRange = features.localSupportedFeature(name);
-        assertTrue(featureRange.isPresent());
-        assertEquals(expectedMin, featureRange.get().min());
-        assertEquals(expectedMax, featureRange.get().max());
+    @Test
+    public void testLocalSupportedFeature() {
+        QuorumFeatures quorumFeatures = new QuorumFeatures(0, new ApiVersions(), LOCAL, Arrays.asList(0, 1, 2));
+        assertEquals(VersionRange.of(0, 3), quorumFeatures.localSupportedFeature("foo"));
+        assertEquals(VersionRange.of(0, 4), quorumFeatures.localSupportedFeature("bar"));
+        assertEquals(VersionRange.of(2, 2), quorumFeatures.localSupportedFeature("baz"));
+        assertEquals(VersionRange.of(0, 0), quorumFeatures.localSupportedFeature("quux"));
     }
 
-    private void assertQuorumFeature(QuorumFeatures features, String name, int expectedMin, int expectedMax) {
-        Optional<VersionRange> featureRange = features.quorumSupportedFeature(name);
-        assertTrue(featureRange.isPresent());
-        assertEquals(expectedMin, featureRange.get().min());
-        assertEquals(expectedMax, featureRange.get().max());
+    @Test
+    public void testReasonNotSupported() {
+        ApiVersions apiVersions = new ApiVersions();
+        QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, LOCAL, Arrays.asList(0, 1, 2));
+        assertEquals(Optional.of("Local controller 0 only supports versions 0-3"),
+                quorumFeatures.reasonNotSupported("foo", (short) 10));
+        apiVersions.update("1", nodeApiVersions(Arrays.asList(
+                new SimpleImmutableEntry<>("foo", VersionRange.of(1, 3)),
+                new SimpleImmutableEntry<>("bar", VersionRange.of(1, 3)),
+                new SimpleImmutableEntry<>("baz", VersionRange.of(1, 2)))));
+        assertEquals(Optional.empty(), quorumFeatures.reasonNotSupported("bar", (short) 3));
+        assertEquals(Optional.of("Controller 1 only supports versions 1-3"),
+                quorumFeatures.reasonNotSupported("bar", (short) 4));
     }
 
+    private static NodeApiVersions nodeApiVersions(List<Entry<String, VersionRange>> entries) {
+        List<SupportedFeatureKey> features = new ArrayList<>();
+        entries.forEach(entry -> {
+            features.add(new SupportedFeatureKey().
+                    setName(entry.getKey()).
+                    setMinVersion(entry.getValue().min()).
+                    setMaxVersion(entry.getValue().max()));
+        });
+        return new NodeApiVersions(Collections.emptyList(), features);
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
index 0812048bb0..6ea31b080b 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -30,8 +29,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
-import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_FEATURE_LEVEL_RECORD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 
@@ -52,11 +49,13 @@ public class FeaturesImageTest {
         DELTA1_RECORDS = new ArrayList<>();
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
             setName("foo").setFeatureLevel((short) 3),
-            FEATURE_LEVEL_RECORD.highestSupportedVersion()));
-        DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveFeatureLevelRecord().
-            setName("bar"), REMOVE_FEATURE_LEVEL_RECORD.highestSupportedVersion()));
-        DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveFeatureLevelRecord().
-            setName("baz"), REMOVE_FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            (short) 0));
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("bar").setFeatureLevel((short) 0),
+            (short) 0));
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("baz").setFeatureLevel((short) 0),
+            (short) 0));
 
         DELTA1 = new FeaturesDelta(IMAGE1);
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);