You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/06/01 23:33:11 UTC

[kafka] branch trunk updated: MINOR: implement BrokerRegistrationChangeRecord (#12195)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 65b4374203 MINOR: implement BrokerRegistrationChangeRecord (#12195)
65b4374203 is described below

commit 65b4374203608dc4d68544bde16d4acd2e31efd1
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Wed Jun 1 16:33:01 2022 -0700

    MINOR: implement BrokerRegistrationChangeRecord (#12195)
    
    Implement BrokerRegistrationChangeRecord as specified in KIP-746. This is a more flexible record than the
    single-purpose Fence / Unfence records.
    
    Reviewers: José Armando García Sancio <js...@gmail.com>, dengziming <de...@gmail.com>
---
 .../kafka/controller/ClusterControlManager.java    | 79 ++++++++++++-------
 .../apache/kafka/controller/QuorumController.java  |  7 +-
 .../controller/ReplicationControlManager.java      | 89 ++++++++++++++--------
 .../metadata/BrokerRegistrationFencingChange.java  | 56 ++++++++++++++
 .../controller/ClusterControlManagerTest.java      | 37 +++++++--
 .../controller/FeatureControlManagerTest.java      |  4 +-
 .../kafka/controller/QuorumControllerTest.java     |  2 +-
 .../controller/ReplicationControlManagerTest.java  |  2 -
 .../BrokerRegistrationFencingChangeTest.java       | 50 ++++++++++++
 .../kafka/server/common/MetadataVersion.java       |  9 ++-
 .../kafka/server/common/MetadataVersionTest.java   |  6 +-
 11 files changed, 265 insertions(+), 76 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 90dfb571b1..5c069e12e7 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
@@ -32,10 +33,12 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeatureCollection;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.VersionRange;
@@ -379,7 +382,7 @@ public class ClusterControlManager {
         if (registration == null) {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration found for that id", record.toString()));
-        } else if (registration.epoch() !=  record.brokerEpoch()) {
+        } else if (registration.epoch() != record.brokerEpoch()) {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration with that epoch found", record.toString()));
         } else {
@@ -391,41 +394,56 @@ public class ClusterControlManager {
     }
 
     public void replay(FenceBrokerRecord record) {
-        int brokerId = record.id();
-        BrokerRegistration registration = brokerRegistrations.get(brokerId);
-        if (registration == null) {
-            throw new RuntimeException(String.format("Unable to replay %s: no broker " +
-                "registration found for that id", record.toString()));
-        } else if (registration.epoch() !=  record.epoch()) {
-            throw new RuntimeException(String.format("Unable to replay %s: no broker " +
-                "registration with that epoch found", record.toString()));
-        } else {
-            if (heartbeatManager != null) heartbeatManager.register(brokerId, true);
-            brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
-            updateMetrics(registration, brokerRegistrations.get(brokerId));
-            log.info("Fenced broker: {}", record);
-        }
+        replayRegistrationChange(record, record.id(), record.epoch(),
+            BrokerRegistrationFencingChange.UNFENCE);
     }
 
     public void replay(UnfenceBrokerRecord record) {
-        int brokerId = record.id();
-        BrokerRegistration registration = brokerRegistrations.get(brokerId);
-        if (registration == null) {
+        replayRegistrationChange(record, record.id(), record.epoch(),
+            BrokerRegistrationFencingChange.FENCE);
+    }
+
+    public void replay(BrokerRegistrationChangeRecord record) {
+        Optional<BrokerRegistrationFencingChange> fencingChange =
+            BrokerRegistrationFencingChange.fromValue(record.fenced());
+        if (!fencingChange.isPresent()) {
+            throw new RuntimeException(String.format("Unable to replay %s: unknown " +
+                "value for fenced field: %d", record.toString(), record.fenced()));
+        }
+        replayRegistrationChange(record, record.brokerId(), record.brokerEpoch(),
+            fencingChange.get());
+    }
+
+    private void replayRegistrationChange(
+        ApiMessage record,
+        int brokerId,
+        long brokerEpoch,
+        BrokerRegistrationFencingChange fencingChange
+    ) {
+        BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
+        if (curRegistration == null) {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration found for that id", record.toString()));
-        } else if (registration.epoch() !=  record.epoch()) {
+        } else if (curRegistration.epoch() != brokerEpoch) {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration with that epoch found", record.toString()));
         } else {
-            if (heartbeatManager != null) heartbeatManager.register(brokerId, false);
-            brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
-            updateMetrics(registration, brokerRegistrations.get(brokerId));
-            log.info("Unfenced broker: {}", record);
-        }
-        if (readyBrokersFuture.isPresent()) {
-            if (readyBrokersFuture.get().check()) {
-                readyBrokersFuture.get().future.complete(null);
-                readyBrokersFuture = Optional.empty();
+            BrokerRegistration nextRegistration = curRegistration;
+            if (fencingChange != BrokerRegistrationFencingChange.NONE) {
+                nextRegistration = nextRegistration.cloneWithFencing(fencingChange.asBoolean().get());
+            }
+            if (!curRegistration.equals(nextRegistration)) {
+                brokerRegistrations.put(brokerId, nextRegistration);
+                updateMetrics(curRegistration, nextRegistration);
+            } else {
+                log.info("Ignoring no-op registration change for {}", curRegistration);
+            }
+            if (heartbeatManager != null) heartbeatManager.register(brokerId, nextRegistration.fenced());
+            if (readyBrokersFuture.isPresent()) {
+                if (readyBrokersFuture.get().check()) {
+                    readyBrokersFuture.get().future.complete(null);
+                    readyBrokersFuture = Optional.empty();
+                }
             }
         }
     }
@@ -437,19 +455,24 @@ public class ClusterControlManager {
             } else {
                 controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
             }
+            log.info("Removed broker: {}", prevRegistration.id());
         } else if (prevRegistration == null) {
             if (registration.fenced()) {
                 controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
+                log.info("Added new fenced broker: {}", registration.id());
             } else {
                 controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
+                log.info("Added new unfenced broker: {}", registration.id());
             }
         } else {
             if (prevRegistration.fenced() && !registration.fenced()) {
                 controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
                 controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
+                log.info("Unfenced broker: {}", registration.id());
             } else if (!prevRegistration.fenced() && registration.fenced()) {
                 controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
                 controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
+                log.info("Fenced broker: {}", registration.id());
             }
         }
     }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 97a2040a38..e921ac4067 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.UpdateFeaturesRequestData;
 import org.apache.kafka.common.message.UpdateFeaturesResponseData;
 import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
@@ -1262,7 +1263,6 @@ public final class QuorumController implements Controller {
                     break;
                 case FEATURE_LEVEL_RECORD:
                     featureControl.replay((FeatureLevelRecord) message);
-
                     handleFeatureControlChange();
                     break;
                 case CLIENT_QUOTA_RECORD:
@@ -1271,6 +1271,9 @@ public final class QuorumController implements Controller {
                 case PRODUCER_IDS_RECORD:
                     producerIdControlManager.replay((ProducerIdsRecord) message);
                     break;
+                case BROKER_REGISTRATION_CHANGE_RECORD:
+                    clusterControl.replay((BrokerRegistrationChangeRecord) message);
+                    break;
                 case ACCESS_CONTROL_ENTRY_RECORD:
                     aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
                     break;
@@ -1573,7 +1576,6 @@ public final class QuorumController implements Controller {
         this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
         this.maxIdleIntervalNs = maxIdleIntervalNs;
         this.replicationControl = new ReplicationControlManager.Builder().
-            setMetadataVersion(() -> featureControl.metadataVersion()).
             setSnapshotRegistry(snapshotRegistry).
             setLogContext(logContext).
             setDefaultReplicationFactor(defaultReplicationFactor).
@@ -1583,6 +1585,7 @@ public final class QuorumController implements Controller {
             setClusterControl(clusterControl).
             setControllerMetrics(controllerMetrics).
             setCreateTopicPolicy(createTopicPolicy).
+            setFeatureControl(featureControl).
             build();
         this.authorizer = authorizer;
         authorizer.ifPresent(a -> a.setAclMutator(this));
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 81ec253434..13e41c978c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.common.ElectionType;
@@ -63,6 +64,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.Lis
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
@@ -75,6 +77,7 @@ import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -112,12 +115,9 @@ import java.util.stream.Collectors;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
-import static org.apache.kafka.common.metadata.MetadataRecordType.FENCE_BROKER_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
-import static org.apache.kafka.common.metadata.MetadataRecordType.UNFENCE_BROKER_RECORD;
-import static org.apache.kafka.common.metadata.MetadataRecordType.UNREGISTER_BROKER_RECORD;
 import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
 import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
 import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
@@ -139,7 +139,6 @@ public class ReplicationControlManager {
     static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
 
     static class Builder {
-        private Supplier<MetadataVersion> metadataVersion = MetadataVersion::latest;
         private SnapshotRegistry snapshotRegistry = null;
         private LogContext logContext = null;
         private short defaultReplicationFactor = (short) 3;
@@ -149,11 +148,7 @@ public class ReplicationControlManager {
         private ClusterControlManager clusterControl = null;
         private ControllerMetrics controllerMetrics = null;
         private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
-
-        Builder setMetadataVersion(Supplier<MetadataVersion> metadataVersion) {
-            this.metadataVersion = metadataVersion;
-            return this;
-        }
+        private FeatureControlManager featureControl = null;
 
         Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
             this.snapshotRegistry = snapshotRegistry;
@@ -200,6 +195,11 @@ public class ReplicationControlManager {
             return this;
         }
 
+        public Builder setFeatureControl(FeatureControlManager featureControl) {
+            this.featureControl = featureControl;
+            return this;
+        }
+
         ReplicationControlManager build() {
             if (configurationControl == null) {
                 throw new IllegalStateException("Configuration control must be set before building");
@@ -210,9 +210,17 @@ public class ReplicationControlManager {
             }
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry();
-            return new ReplicationControlManager(
-                metadataVersion,
-                snapshotRegistry,
+            if (featureControl == null) {
+                featureControl = new FeatureControlManager.Builder().
+                    setLogContext(logContext).
+                    setSnapshotRegistry(snapshotRegistry).
+                    setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                        QuorumFeatures.defaultFeatureMap(),
+                        Collections.singletonList(0))).
+                    setMetadataVersion(MetadataVersion.latest()).
+                    build();
+            }
+            return new ReplicationControlManager(snapshotRegistry,
                 logContext,
                 defaultReplicationFactor,
                 defaultNumPartitions,
@@ -220,7 +228,8 @@ public class ReplicationControlManager {
                 configurationControl,
                 clusterControl,
                 controllerMetrics,
-                createTopicPolicy);
+                createTopicPolicy,
+                featureControl);
         }
     }
 
@@ -275,11 +284,6 @@ public class ReplicationControlManager {
      */
     private final int defaultNumPartitions;
 
-    /**
-     * Metadata version (or IBP) for the cluster
-     */
-    private final Supplier<MetadataVersion> metadataVersion;
-
     /**
      * Maximum number of leader elections to perform during one partition leader balancing operation.
      */
@@ -310,6 +314,11 @@ public class ReplicationControlManager {
      */
     private final Optional<CreateTopicPolicy> createTopicPolicy;
 
+    /**
+     * The feature control manager.
+     */
+    private final FeatureControlManager featureControl;
+
     /**
      * Maps topic names to topic UUIDs.
      */
@@ -358,7 +367,6 @@ public class ReplicationControlManager {
     final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
 
     private ReplicationControlManager(
-        Supplier<MetadataVersion> metadataVersion,
         SnapshotRegistry snapshotRegistry,
         LogContext logContext,
         short defaultReplicationFactor,
@@ -367,9 +375,9 @@ public class ReplicationControlManager {
         ConfigurationControlManager configurationControl,
         ClusterControlManager clusterControl,
         ControllerMetrics controllerMetrics,
-        Optional<CreateTopicPolicy> createTopicPolicy
+        Optional<CreateTopicPolicy> createTopicPolicy,
+        FeatureControlManager featureControl
     ) {
-        this.metadataVersion = metadataVersion;
         this.snapshotRegistry = snapshotRegistry;
         this.log = logContext.logger(ReplicationControlManager.class);
         this.defaultReplicationFactor = defaultReplicationFactor;
@@ -378,6 +386,7 @@ public class ReplicationControlManager {
         this.configurationControl = configurationControl;
         this.controllerMetrics = controllerMetrics;
         this.createTopicPolicy = createTopicPolicy;
+        this.featureControl = featureControl;
         this.clusterControl = clusterControl;
         this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
         this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -929,7 +938,7 @@ public class ReplicationControlManager {
                     topic.id,
                     partitionId,
                     r -> clusterControl.unfenced(r),
-                    metadataVersion.get().isLeaderRecoverySupported());
+                    featureControl.metadataVersion().isLeaderRecoverySupported());
                 if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name())) {
                     builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                 }
@@ -1095,9 +1104,16 @@ public class ReplicationControlManager {
         }
         generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records,
             brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
-        records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
-            setId(brokerId).setEpoch(brokerRegistration.epoch()),
-            FENCE_BROKER_RECORD.highestSupportedVersion()));
+        if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
+            records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+                    setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).
+                    setFenced(BrokerRegistrationFencingChange.UNFENCE.value()),
+                    (short) 0));
+        } else {
+            records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
+                    setId(brokerId).setEpoch(brokerRegistration.epoch()),
+                    (short) 0));
+        }
     }
 
     /**
@@ -1116,7 +1132,7 @@ public class ReplicationControlManager {
             brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
         records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
             setBrokerId(brokerId).setBrokerEpoch(brokerEpoch),
-            UNREGISTER_BROKER_RECORD.highestSupportedVersion()));
+            (short) 0));
     }
 
     /**
@@ -1131,8 +1147,15 @@ public class ReplicationControlManager {
      * @param records       The record list to append to.
      */
     void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
-        records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId).
-            setEpoch(brokerEpoch), UNFENCE_BROKER_RECORD.highestSupportedVersion()));
+        if (featureControl.metadataVersion().isBrokerRegistrationChangeRecordSupported()) {
+            records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+                setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
+                setFenced(BrokerRegistrationFencingChange.FENCE.value()),
+                (short) 0));
+        } else {
+            records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().setId(brokerId).
+                setEpoch(brokerEpoch), (short) 0));
+        }
         generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER, brokerId, records,
             brokersToIsrs.partitionsWithNoLeader());
     }
@@ -1223,7 +1246,7 @@ public class ReplicationControlManager {
             topicId,
             partitionId,
             r -> clusterControl.unfenced(r),
-            metadataVersion.get().isLeaderRecoverySupported());
+            featureControl.metadataVersion().isLeaderRecoverySupported());
         builder.setElection(election);
         Optional<ApiMessageAndVersion> record = builder.build();
         if (!record.isPresent()) {
@@ -1339,7 +1362,7 @@ public class ReplicationControlManager {
                 topicPartition.topicId(),
                 topicPartition.partitionId(),
                 r -> clusterControl.unfenced(r),
-                metadataVersion.get().isLeaderRecoverySupported()
+                featureControl.metadataVersion().isLeaderRecoverySupported()
             );
             builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
             builder.build().ifPresent(records::add);
@@ -1542,7 +1565,7 @@ public class ReplicationControlManager {
                 topicIdPart.topicId(),
                 topicIdPart.partitionId(),
                 isAcceptableLeader,
-                metadataVersion.get().isLeaderRecoverySupported());
+                featureControl.metadataVersion().isLeaderRecoverySupported());
             if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
                 builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
             }
@@ -1652,7 +1675,7 @@ public class ReplicationControlManager {
             tp.topicId(),
             tp.partitionId(),
             r -> clusterControl.unfenced(r),
-            metadataVersion.get().isLeaderRecoverySupported());
+            featureControl.metadataVersion().isLeaderRecoverySupported());
         if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
             builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
         }
@@ -1704,7 +1727,7 @@ public class ReplicationControlManager {
             tp.topicId(),
             tp.partitionId(),
             r -> clusterControl.unfenced(r),
-            metadataVersion.get().isLeaderRecoverySupported());
+            featureControl.metadataVersion().isLeaderRecoverySupported());
         if (!reassignment.merged().equals(currentReplicas)) {
             builder.setTargetReplicas(reassignment.merged());
         }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
new file mode 100644
index 0000000000..1c2729355f
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+public enum BrokerRegistrationFencingChange {
+    FENCE(-1, Optional.of(false)),
+    NONE(0, Optional.empty()),
+    UNFENCE(1, Optional.of(true));
+
+    private final byte value;
+
+    private final Optional<Boolean> asBoolean;
+
+    private final static Map<Byte, BrokerRegistrationFencingChange> VALUE_TO_ENUM =
+        Arrays.stream(BrokerRegistrationFencingChange.values()).
+                collect(Collectors.toMap(v -> Byte.valueOf(v.value()), Function.identity()));
+
+    public static Optional<BrokerRegistrationFencingChange> fromValue(byte value) {
+        return Optional.ofNullable(VALUE_TO_ENUM.get(value));
+    }
+
+    BrokerRegistrationFencingChange(int value, Optional<Boolean> asBoolean) {
+        this.value = (byte) value;
+        this.asBoolean = asBoolean;
+    }
+
+    public Optional<Boolean> asBoolean() {
+        return asBoolean;
+    }
+
+    public byte value() {
+        return value;
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index a173cf5f94..590578f6c6 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -23,14 +23,17 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -43,12 +46,15 @@ import org.apache.kafka.metadata.placement.ClusterDescriber;
 import org.apache.kafka.metadata.placement.PlacementSpec;
 import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -57,8 +63,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
 public class ClusterControlManagerTest {
-    @Test
-    public void testReplay() {
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV0", "IBP_3_3_IV2"})
+    public void testReplay(MetadataVersion metadataVersion) {
         MockTime time = new MockTime(0, 0, 0);
 
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
@@ -86,11 +93,29 @@ public class ClusterControlManagerTest {
         assertFalse(clusterControl.unfenced(0));
         assertFalse(clusterControl.unfenced(1));
 
-        UnfenceBrokerRecord unfenceBrokerRecord =
-            new UnfenceBrokerRecord().setId(1).setEpoch(100);
-        clusterControl.replay(unfenceBrokerRecord);
+        if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
+            UnfenceBrokerRecord unfenceBrokerRecord =
+                    new UnfenceBrokerRecord().setId(1).setEpoch(100);
+            clusterControl.replay(unfenceBrokerRecord);
+        } else {
+            BrokerRegistrationChangeRecord changeRecord =
+                    new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced((byte) -1);
+            clusterControl.replay(changeRecord);
+        }
         assertFalse(clusterControl.unfenced(0));
         assertTrue(clusterControl.unfenced(1));
+
+        if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
+            FenceBrokerRecord fenceBrokerRecord =
+                    new FenceBrokerRecord().setId(1).setEpoch(100);
+            clusterControl.replay(fenceBrokerRecord);
+        } else {
+            BrokerRegistrationChangeRecord changeRecord =
+                    new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced((byte) 1);
+            clusterControl.replay(changeRecord);
+        }
+        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.unfenced(1));
     }
 
     @Test
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 3a8c4042df..fd1e5af39e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -251,11 +251,11 @@ public class FeatureControlManagerTest {
 
 
         result = manager.updateFeatures(
-            Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
+            Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_1_IV0.featureLevel()),
             Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
             Collections.emptyMap(),
             true);
-        assertEquals(Errors.NONE, result.response().get(MetadataVersion.FEATURE_NAME).error());
+        assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
 
         result = manager.updateFeatures(
                 Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV0.featureLevel()),
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index eef7215070..4e898b924c 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -718,7 +718,7 @@ public class QuorumControllerTest {
                                             setPartitionIndex(1).
                                             setBrokerIds(Arrays.asList(1, 2, 0))).
                                                 iterator()))).iterator())),
-                        Collections.singleton(topicName)).get();
+                        Collections.singleton(topicName)).get(60, TimeUnit.SECONDS);
                 }
                 logEnv.waitForLatestSnapshot();
             }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index abc90912a1..d7141a76dd 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -76,7 +76,6 @@ import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
 import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
@@ -165,7 +164,6 @@ public class ReplicationControlManagerTest {
 
         ReplicationControlTestContext(Optional<CreateTopicPolicy> createTopicPolicy) {
             this.replicationControl = new ReplicationControlManager.Builder().
-                setMetadataVersion(() -> MetadataVersion.IBP_3_3_IV1).
                 setSnapshotRegistry(snapshotRegistry).
                 setLogContext(logContext).
                 setMaxElectionsPerImbalance(Integer.MAX_VALUE).
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationFencingChangeTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationFencingChangeTest.java
new file mode 100644
index 0000000000..9266192f24
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationFencingChangeTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(40)
+public class BrokerRegistrationFencingChangeTest {
+    @Test
+    public void testValues() {
+        assertEquals((byte) -1, BrokerRegistrationFencingChange.FENCE.value());
+        assertEquals((byte) 0, BrokerRegistrationFencingChange.NONE.value());
+        assertEquals((byte) 1, BrokerRegistrationFencingChange.UNFENCE.value());
+    }
+
+    @Test
+    public void testAsBoolean() {
+        assertEquals(Optional.of(false), BrokerRegistrationFencingChange.FENCE.asBoolean());
+        assertEquals(Optional.empty(), BrokerRegistrationFencingChange.NONE.asBoolean());
+        assertEquals(Optional.of(true), BrokerRegistrationFencingChange.UNFENCE.asBoolean());
+    }
+
+    @Test
+    public void testValueRoundTrip() {
+        for (BrokerRegistrationFencingChange change : BrokerRegistrationFencingChange.values()) {
+            assertEquals(Optional.of(change), BrokerRegistrationFencingChange.fromValue(change.value()));
+        }
+    }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 920c60300d..ee19faf88b 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -156,7 +156,10 @@ public enum MetadataVersion {
     IBP_3_3_IV0(5, "3.3", "IV0", false),
 
     // Support NoopRecord for the cluster metadata log (KIP-835)
-    IBP_3_3_IV1(6, "3.3", "IV1", true);
+    IBP_3_3_IV1(6, "3.3", "IV1", true),
+
+    // In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord.
+    IBP_3_3_IV2(7, "3.3", "IV2", true);
 
     public static final String FEATURE_NAME = "metadata.version";
 
@@ -236,6 +239,10 @@ public enum MetadataVersion {
         }
     }
 
+    public boolean isBrokerRegistrationChangeRecordSupported() {
+        return this.isAtLeast(IBP_3_3_IV2);
+    }
+
     private static final Map<String, MetadataVersion> IBP_VERSIONS;
     static {
         {
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 8649586be9..07a644e729 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -61,6 +61,7 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -189,9 +190,10 @@ class MetadataVersionTest {
         assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
         assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2-IV0"));
 
-        assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3"));
+        assertEquals(IBP_3_3_IV2, MetadataVersion.fromVersionString("3.3"));
         assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3-IV0"));
         assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3-IV1"));
+        assertEquals(IBP_3_3_IV2, MetadataVersion.fromVersionString("3.3-IV2"));
     }
 
     @Test
@@ -237,6 +239,7 @@ class MetadataVersionTest {
         assertEquals("3.2", IBP_3_2_IV0.shortVersion());
         assertEquals("3.3", IBP_3_3_IV0.shortVersion());
         assertEquals("3.3", IBP_3_3_IV1.shortVersion());
+        assertEquals("3.3", IBP_3_3_IV2.shortVersion());
     }
 
     @Test
@@ -271,6 +274,7 @@ class MetadataVersionTest {
         assertEquals("3.2-IV0", IBP_3_2_IV0.version());
         assertEquals("3.3-IV0", IBP_3_3_IV0.version());
         assertEquals("3.3-IV1", IBP_3_3_IV1.version());
+        assertEquals("3.3-IV2", IBP_3_3_IV2.version());
     }
 
     @Test