You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/06/07 17:37:41 UTC

[kafka] branch trunk updated: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (#12240)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 151ca12a56 KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (#12240)
151ca12a56 is described below

commit 151ca12a56c854d78be07f9893ef7984277bb5c1
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Jun 7 19:37:20 2022 +0200

    KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (#12240)
    
    This PR implements the first part of KIP-841. Specifically, it implements the following:
    
    1. Adds a new metadata version.
    2. Adds the InControlledShutdown field to the BrokerRegistrationRecord and BrokerRegistrationChangeRecord and bump their versions. The newest versions are only used if the new metadata version is enabled.
    3. Writes a BrokerRegistrationChangeRecord with InControlledShutdown set when a broker requests a controlled shutdown.
    4. Ensures that fenced and in controlled shutdown replicas are not picked as leaders nor included in the ISR.
    5. Adds or extends unit tests.
    
    Reviewes: José Armando García Sancio <js...@users.noreply.github.com>, dengziming <de...@gmail.com>, David Arthur <mu...@gmail.com>
---
 .../metadata/BrokerMetadataSnapshotter.scala       |   2 -
 .../metadata/BrokerMetadataListenerTest.scala      |   4 +-
 .../kafka/controller/ClusterControlManager.java    | 131 +++++++--
 .../apache/kafka/controller/QuorumController.java  |  18 +-
 .../controller/ReplicationControlManager.java      | 103 +++++---
 .../java/org/apache/kafka/image/ClusterDelta.java  |  36 ++-
 .../java/org/apache/kafka/image/ClusterImage.java  |   6 +-
 .../java/org/apache/kafka/image/MetadataImage.java |   9 +-
 .../apache/kafka/metadata/BrokerRegistration.java  |  62 ++++-
 ...okerRegistrationInControlledShutdownChange.java |  57 ++++
 .../metadata/BrokerRegistrationChangeRecord.json   |   6 +-
 .../common/metadata/RegisterBrokerRecord.json      |   6 +-
 .../controller/ClusterControlManagerTest.java      | 226 +++++++++++++++-
 .../controller/ProducerIdControlManagerTest.java   |  12 +
 .../kafka/controller/QuorumControllerTest.java     |  24 +-
 .../controller/ReplicationControlManagerTest.java  | 293 +++++++++++++++++++--
 .../org/apache/kafka/image/ClusterImageTest.java   |  20 +-
 .../kafka/metadata/BrokerRegistrationTest.java     |  30 +--
 .../kafka/server/common/MetadataVersion.java       |  17 +-
 .../kafka/server/common/MetadataVersionTest.java   |  25 +-
 20 files changed, 922 insertions(+), 165 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
index f32c4d3238..b5179c32f1 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -17,7 +17,6 @@
 package kafka.server.metadata
 
 import java.util.concurrent.RejectedExecutionException
-
 import kafka.utils.Logging
 import org.apache.kafka.image.MetadataImage
 import org.apache.kafka.common.utils.{LogContext, Time}
@@ -25,7 +24,6 @@ import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.snapshot.SnapshotWriter
 
-
 trait SnapshotWriterBuilder {
   def build(committedOffset: Long,
             committedEpoch: Int,
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 20df47a6a6..948e051337 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -98,11 +98,11 @@ class BrokerMetadataListenerTest {
           assertEquals(200L, newImage.highestOffsetAndEpoch().offset)
           assertEquals(new BrokerRegistration(0, 100L,
             Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg"), Collections.emptyList[Endpoint](),
-            Collections.emptyMap[String, VersionRange](), Optional.empty[String](), false),
+            Collections.emptyMap[String, VersionRange](), Optional.empty[String](), false, false),
             delta.clusterDelta().broker(0))
           assertEquals(new BrokerRegistration(1, 200L,
             Uuid.fromString("QkOQtNKVTYatADcaJ28xDg"), Collections.emptyList[Endpoint](),
-            Collections.emptyMap[String, VersionRange](), Optional.empty[String](), true),
+            Collections.emptyMap[String, VersionRange](), Optional.empty[String](), true, false),
             delta.clusterDelta().broker(1))
         }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 5c069e12e7..21bd01c3b9 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.VersionRange;
@@ -46,6 +47,7 @@ import org.apache.kafka.metadata.placement.ReplicaPlacer;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
 import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
 import org.slf4j.Logger;
@@ -64,8 +66,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
 
 
 /**
@@ -84,6 +86,7 @@ public class ClusterControlManager {
         private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
         private ReplicaPlacer replicaPlacer = null;
         private ControllerMetrics controllerMetrics = null;
+        private FeatureControlManager featureControl = null;
 
         Builder setLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -120,8 +123,15 @@ public class ClusterControlManager {
             return this;
         }
 
+        Builder setFeatureControlManager(FeatureControlManager featureControl) {
+            this.featureControl = featureControl;
+            return this;
+        }
+
         ClusterControlManager build() {
-            if (logContext == null) logContext = new LogContext();
+            if (logContext == null) {
+                logContext = new LogContext();
+            }
             if (clusterId == null) {
                 clusterId = Uuid.randomUuid().toString();
             }
@@ -132,7 +142,10 @@ public class ClusterControlManager {
                 replicaPlacer = new StripedReplicaPlacer(new Random());
             }
             if (controllerMetrics == null) {
-                throw new RuntimeException("You must specify controllerMetrics");
+                throw new RuntimeException("You must specify ControllerMetrics");
+            }
+            if (featureControl == null) {
+                throw new RuntimeException("You must specify FeatureControlManager");
             }
             return new ClusterControlManager(logContext,
                 clusterId,
@@ -140,7 +153,9 @@ public class ClusterControlManager {
                 snapshotRegistry,
                 sessionTimeoutNs,
                 replicaPlacer,
-                controllerMetrics);
+                controllerMetrics,
+                featureControl
+            );
         }
     }
 
@@ -218,6 +233,11 @@ public class ClusterControlManager {
      */
     private Optional<ReadyBrokersFuture> readyBrokersFuture;
 
+    /**
+     * The feature control manager.
+     */
+    private final FeatureControlManager featureControl;
+
     private ClusterControlManager(
         LogContext logContext,
         String clusterId,
@@ -225,7 +245,8 @@ public class ClusterControlManager {
         SnapshotRegistry snapshotRegistry,
         long sessionTimeoutNs,
         ReplicaPlacer replicaPlacer,
-        ControllerMetrics metrics
+        ControllerMetrics metrics,
+        FeatureControlManager featureControl
     ) {
         this.logContext = logContext;
         this.clusterId = clusterId;
@@ -237,6 +258,7 @@ public class ClusterControlManager {
         this.heartbeatManager = null;
         this.readyBrokersFuture = Optional.empty();
         this.controllerMetrics = metrics;
+        this.featureControl = featureControl;
     }
 
     ReplicaPlacer replicaPlacer() {
@@ -339,7 +361,8 @@ public class ClusterControlManager {
         heartbeatManager.register(brokerId, record.fenced());
 
         List<ApiMessageAndVersion> records = new ArrayList<>();
-        records.add(new ApiMessageAndVersion(record, REGISTER_BROKER_RECORD.highestSupportedVersion()));
+        records.add(new ApiMessageAndVersion(record, featureControl.metadataVersion().
+            registerBrokerRecordVersion()));
         return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
     }
 
@@ -361,7 +384,8 @@ public class ClusterControlManager {
         BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
                 new BrokerRegistration(brokerId, record.brokerEpoch(),
                     record.incarnationId(), listeners, features,
-                    Optional.ofNullable(record.rack()), record.fenced()));
+                    Optional.ofNullable(record.rack()), record.fenced(),
+                    record.inControlledShutdown()));
         updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
         if (heartbeatManager != null) {
             if (prevRegistration != null) heartbeatManager.remove(brokerId);
@@ -394,31 +418,49 @@ public class ClusterControlManager {
     }
 
     public void replay(FenceBrokerRecord record) {
-        replayRegistrationChange(record, record.id(), record.epoch(),
-            BrokerRegistrationFencingChange.UNFENCE);
+        replayRegistrationChange(
+            record,
+            record.id(),
+            record.epoch(),
+            BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
+            BrokerRegistrationInControlledShutdownChange.NONE.asBoolean()
+        );
     }
 
     public void replay(UnfenceBrokerRecord record) {
-        replayRegistrationChange(record, record.id(), record.epoch(),
-            BrokerRegistrationFencingChange.FENCE);
+        replayRegistrationChange(
+            record,
+            record.id(),
+            record.epoch(),
+            BrokerRegistrationFencingChange.FENCE.asBoolean(),
+            BrokerRegistrationInControlledShutdownChange.NONE.asBoolean()
+        );
     }
 
     public void replay(BrokerRegistrationChangeRecord record) {
-        Optional<BrokerRegistrationFencingChange> fencingChange =
-            BrokerRegistrationFencingChange.fromValue(record.fenced());
-        if (!fencingChange.isPresent()) {
-            throw new RuntimeException(String.format("Unable to replay %s: unknown " +
-                "value for fenced field: %d", record.toString(), record.fenced()));
-        }
-        replayRegistrationChange(record, record.brokerId(), record.brokerEpoch(),
-            fencingChange.get());
+        BrokerRegistrationFencingChange fencingChange =
+            BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(
+                () -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
+                    "value for fenced field: %d", record, record.fenced())));
+        BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
+            BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
+                () -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
+                    "value for inControlledShutdown field: %d", record, record.inControlledShutdown())));
+        replayRegistrationChange(
+            record,
+            record.brokerId(),
+            record.brokerEpoch(),
+            fencingChange.asBoolean(),
+            inControlledShutdownChange.asBoolean()
+        );
     }
 
     private void replayRegistrationChange(
         ApiMessage record,
         int brokerId,
         long brokerEpoch,
-        BrokerRegistrationFencingChange fencingChange
+        Optional<Boolean> fencingChange,
+        Optional<Boolean> inControlledShutdownChange
     ) {
         BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
         if (curRegistration == null) {
@@ -428,10 +470,10 @@ public class ClusterControlManager {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration with that epoch found", record.toString()));
         } else {
-            BrokerRegistration nextRegistration = curRegistration;
-            if (fencingChange != BrokerRegistrationFencingChange.NONE) {
-                nextRegistration = nextRegistration.cloneWithFencing(fencingChange.asBoolean().get());
-            }
+            BrokerRegistration nextRegistration = curRegistration.cloneWith(
+                fencingChange,
+                inControlledShutdownChange
+            );
             if (!curRegistration.equals(nextRegistration)) {
                 brokerRegistrations.put(brokerId, nextRegistration);
                 updateMetrics(curRegistration, nextRegistration);
@@ -485,12 +527,36 @@ public class ClusterControlManager {
             id -> brokerRegistrations.get(id).rack());
     }
 
+    /**
+     * Returns true if the broker is in fenced state; Returns false if it is
+     * not or if it does not exist.
+     */
     public boolean unfenced(int brokerId) {
         BrokerRegistration registration = brokerRegistrations.get(brokerId);
         if (registration == null) return false;
         return !registration.fenced();
     }
 
+    /**
+     * Returns true if the broker is in controlled shutdown state; Returns false
+     * if it is not or if it does not exist.
+     */
+    public boolean inControlledShutdown(int brokerId) {
+        BrokerRegistration registration = brokerRegistrations.get(brokerId);
+        if (registration == null) return false;
+        return registration.inControlledShutdown();
+    }
+
+    /**
+     * Returns true if the broker is active. Active means not fenced nor in controlled
+     * shutdown; Returns false if it is not active or if it does not exist.
+     */
+    public boolean active(int brokerId) {
+        BrokerRegistration registration = brokerRegistrations.get(brokerId);
+        if (registration == null) return false;
+        return !registration.inControlledShutdown() && !registration.fenced();
+    }
+
     BrokerHeartbeatManager heartbeatManager() {
         if (heartbeatManager == null) {
             throw new RuntimeException("ClusterControlManager is not active.");
@@ -520,9 +586,15 @@ public class ClusterControlManager {
 
     class ClusterControlIterator implements Iterator<List<ApiMessageAndVersion>> {
         private final Iterator<Entry<Integer, BrokerRegistration>> iterator;
+        private final MetadataVersion metadataVersion;
 
         ClusterControlIterator(long epoch) {
             this.iterator = brokerRegistrations.entrySet(epoch).iterator();
+            if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+                this.metadataVersion = MetadataVersion.IBP_3_0_IV1;
+            } else {
+                this.metadataVersion = featureControl.metadataVersion();
+            }
         }
 
         @Override
@@ -549,16 +621,19 @@ public class ClusterControlManager {
                     setMaxSupportedVersion(featureEntry.getValue().max()).
                     setMinSupportedVersion(featureEntry.getValue().min()));
             }
-            List<ApiMessageAndVersion> batch = new ArrayList<>();
-            batch.add(new ApiMessageAndVersion(new RegisterBrokerRecord().
+            RegisterBrokerRecord record = new RegisterBrokerRecord().
                 setBrokerId(brokerId).
                 setIncarnationId(registration.incarnationId()).
                 setBrokerEpoch(registration.epoch()).
                 setEndPoints(endpoints).
                 setFeatures(features).
                 setRack(registration.rack().orElse(null)).
-                setFenced(registration.fenced()), REGISTER_BROKER_RECORD.highestSupportedVersion()));
-            return batch;
+                setFenced(registration.fenced());
+            if (metadataVersion.isInControlledShutdownStateSupported()) {
+                record.setInControlledShutdown(registration.inControlledShutdown());
+            }
+            return singletonList(new ApiMessageAndVersion(record,
+                metadataVersion.registerBrokerRecordVersion()));
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index e921ac4067..0c6d665027 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -622,11 +622,16 @@ public final class QuorumController implements Controller {
         }
     }
 
-    // VisibleForTesting
+    // Visible for testing
     ReplicationControlManager replicationControl() {
         return replicationControl;
     }
 
+    // Visible for testing
+    ClusterControlManager clusterControl() {
+        return clusterControl;
+    }
+
     <T> CompletableFuture<T> appendReadEvent(
         String name,
         OptionalLong deadlineNs,
@@ -1557,6 +1562,11 @@ public final class QuorumController implements Controller {
             setNodeId(nodeId).
             build();
         this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
+        this.featureControl = new FeatureControlManager.Builder().
+            setLogContext(logContext).
+            setQuorumFeatures(quorumFeatures).
+            setSnapshotRegistry(snapshotRegistry).
+            build();
         this.clusterControl = new ClusterControlManager.Builder().
             setLogContext(logContext).
             setClusterId(clusterId).
@@ -1565,12 +1575,8 @@ public final class QuorumController implements Controller {
             setSessionTimeoutNs(sessionTimeoutNs).
             setReplicaPlacer(replicaPlacer).
             setControllerMetrics(controllerMetrics).
+            setFeatureControlManager(featureControl).
             build();
-        this.featureControl = new FeatureControlManager.Builder().
-                setLogContext(logContext).
-                setQuorumFeatures(quorumFeatures).
-                setSnapshotRegistry(snapshotRegistry).
-                build();
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
         this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
         this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 13e41c978c..1c81954617 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -78,6 +78,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -652,11 +653,11 @@ public class ReplicationControlManager {
                 validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor);
                 replicationFactor = OptionalInt.of(assignment.brokerIds().size());
                 List<Integer> isr = assignment.brokerIds().stream().
-                    filter(clusterControl::unfenced).collect(Collectors.toList());
+                    filter(clusterControl::active).collect(Collectors.toList());
                 if (isr.isEmpty()) {
                     return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
                         "All brokers specified in the manual partition assignment for " +
-                        "partition " + assignment.partitionIndex() + " are fenced.");
+                        "partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown.");
                 }
                 newParts.put(assignment.partitionIndex(), new PartitionRegistration(
                     Replicas.toArray(assignment.brokerIds()), Replicas.toArray(isr),
@@ -682,25 +683,41 @@ public class ReplicationControlManager {
             short replicationFactor = topic.replicationFactor() == -1 ?
                 defaultReplicationFactor : topic.replicationFactor();
             try {
-                List<List<Integer>> replicas = clusterControl.replicaPlacer().place(new PlacementSpec(
+                List<List<Integer>> partitions = clusterControl.replicaPlacer().place(new PlacementSpec(
                     0,
                     numPartitions,
                     replicationFactor
                 ), clusterDescriber);
-                for (int partitionId = 0; partitionId < replicas.size(); partitionId++) {
-                    int[] r = Replicas.toArray(replicas.get(partitionId));
+                for (int partitionId = 0; partitionId < partitions.size(); partitionId++) {
+                    List<Integer> replicas = partitions.get(partitionId);
+                    List<Integer> isr = replicas.stream().
+                        filter(clusterControl::active).collect(Collectors.toList());
+                    // If the ISR is empty, it means that all brokers are fenced or
+                    // in controlled shutdown. To be consistent with the replica placer,
+                    // we reject the create topic request with INVALID_REPLICATION_FACTOR.
+                    if (isr.isEmpty()) {
+                        return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
+                            "Unable to replicate the partition " + replicationFactor +
+                                " time(s): All brokers are currently fenced or in controlled shutdown.");
+                    }
                     newParts.put(partitionId,
-                        new PartitionRegistration(r, r, Replicas.NONE, Replicas.NONE, r[0], LeaderRecoveryState.RECOVERED, 0, 0));
+                        new PartitionRegistration(
+                            Replicas.toArray(replicas),
+                            Replicas.toArray(isr),
+                            Replicas.NONE,
+                            Replicas.NONE,
+                            isr.get(0),
+                            LeaderRecoveryState.RECOVERED,
+                            0,
+                            0));
                 }
             } catch (InvalidReplicationFactorException e) {
                 return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
                     "Unable to replicate the partition " + replicationFactor +
                         " time(s): " + e.getMessage());
             }
-            ApiError error = maybeCheckCreateTopicPolicy(() -> {
-                return new CreateTopicPolicy.RequestMetadata(
-                    topic.name(), numPartitions, replicationFactor, null, creationConfigs);
-            });
+            ApiError error = maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata(
+                topic.name(), numPartitions, replicationFactor, null, creationConfigs));
             if (error.isFailure()) return error;
         }
         Uuid topicId = Uuid.randomUuid();
@@ -937,7 +954,7 @@ public class ReplicationControlManager {
                     partition,
                     topic.id,
                     partitionId,
-                    r -> clusterControl.unfenced(r),
+                    clusterControl::active,
                     featureControl.metadataVersion().isLeaderRecoverySupported());
                 if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name())) {
                     builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
@@ -1138,7 +1155,7 @@ public class ReplicationControlManager {
     /**
      * Generate the appropriate records to handle a broker becoming unfenced.
      *
-     * First, we create an UnfenceBrokerRecord. Then, we check if if there are any
+     * First, we create an UnfenceBrokerRecord. Then, we check if there are any
      * partitions that don't currently have a leader that should be led by the newly
      * unfenced broker.
      *
@@ -1160,6 +1177,29 @@ public class ReplicationControlManager {
             brokersToIsrs.partitionsWithNoLeader());
     }
 
+    /**
+     * Generate the appropriate records to handle a broker starting a controlled shutdown.
+     *
+     * First, we create an BrokerRegistrationChangeRecord. Then, we remove this broker
+     * from any non-singleton ISR and elect new leaders for partitions led by this
+     * broker.
+     *
+     * @param brokerId      The broker id.
+     * @param brokerEpoch   The broker epoch.
+     * @param records       The record list to append to.
+     */
+    void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMessageAndVersion> records) {
+        if (featureControl.metadataVersion().isInControlledShutdownStateSupported()
+                && !clusterControl.inControlledShutdown(brokerId)) {
+            records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+                setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
+                setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
+                (short) 1));
+        }
+        generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]",
+            brokerId, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+    }
+
     ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
         ElectionType electionType = electionType(request.electionType());
         List<ApiMessageAndVersion> records = new ArrayList<>();
@@ -1245,7 +1285,7 @@ public class ReplicationControlManager {
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
             topicId,
             partitionId,
-            r -> clusterControl.unfenced(r),
+            clusterControl::active,
             featureControl.metadataVersion().isLeaderRecoverySupported());
         builder.setElection(election);
         Optional<ApiMessageAndVersion> record = builder.build();
@@ -1278,8 +1318,7 @@ public class ReplicationControlManager {
                     handleBrokerUnfenced(brokerId, brokerEpoch, records);
                     break;
                 case CONTROLLED_SHUTDOWN:
-                    generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]",
-                        brokerId, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+                    handleBrokerInControlledShutdown(brokerId, brokerEpoch, records);
                     break;
                 case SHUTDOWN_NOW:
                     handleBrokerFenced(brokerId, records);
@@ -1361,7 +1400,7 @@ public class ReplicationControlManager {
                 partition,
                 topicPartition.topicId(),
                 topicPartition.partitionId(),
-                r -> clusterControl.unfenced(r),
+                clusterControl::active,
                 featureControl.metadataVersion().isLeaderRecoverySupported()
             );
             builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
@@ -1371,11 +1410,6 @@ public class ReplicationControlManager {
         return ControllerResult.of(records, rescheduleImmidiately);
     }
 
-    // Visible for testing
-    Boolean isBrokerUnfenced(int brokerId) {
-        return clusterControl.unfenced(brokerId);
-    }
-
     ControllerResult<List<CreatePartitionsTopicResult>>
             createPartitions(List<CreatePartitionsTopic> topics) {
         List<ApiMessageAndVersion> records = new ArrayList<>();
@@ -1449,11 +1483,11 @@ public class ReplicationControlManager {
                     OptionalInt.of(replicationFactor));
                 placements.add(assignment.brokerIds());
                 List<Integer> isr = assignment.brokerIds().stream().
-                    filter(clusterControl::unfenced).collect(Collectors.toList());
+                    filter(clusterControl::active).collect(Collectors.toList());
                 if (isr.isEmpty()) {
                     throw new InvalidReplicaAssignmentException(
                         "All brokers specified in the manual partition assignment for " +
-                            "partition " + (startPartitionId + i) + " are fenced.");
+                            "partition " + (startPartitionId + i) + " are fenced or in controlled shutdown.");
                 }
                 isrs.add(isr);
             }
@@ -1467,12 +1501,21 @@ public class ReplicationControlManager {
         }
         int partitionId = startPartitionId;
         for (int i = 0; i < placements.size(); i++) {
-            List<Integer> placement = placements.get(i);
-            List<Integer> isr = isrs.get(i);
+            List<Integer> replicas = placements.get(i);
+            List<Integer> isr = isrs.get(i).stream().
+                filter(clusterControl::active).collect(Collectors.toList());
+            // If the ISR is empty, it means that all brokers are fenced or
+            // in controlled shutdown. To be consistent with the replica placer,
+            // we reject the create topic request with INVALID_REPLICATION_FACTOR.
+            if (isr.isEmpty()) {
+                throw new InvalidReplicationFactorException(
+                    "Unable to replicate the partition " + replicationFactor +
+                        " time(s): All brokers are currently fenced or in controlled shutdown.");
+            }
             records.add(new ApiMessageAndVersion(new PartitionRecord().
                 setPartitionId(partitionId).
                 setTopicId(topicId).
-                setReplicas(placement).
+                setReplicas(replicas).
                 setIsr(isr).
                 setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
                 setRemovingReplicas(Collections.emptyList()).
@@ -1547,14 +1590,14 @@ public class ReplicationControlManager {
         // where there is an unclean leader election which chooses a leader from outside
         // the ISR.
         Function<Integer, Boolean> isAcceptableLeader =
-            r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.unfenced(r));
+            r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.active(r));
 
         while (iterator.hasNext()) {
             TopicIdPartition topicIdPart = iterator.next();
             TopicControlInfo topic = topics.get(topicIdPart.topicId());
             if (topic == null) {
                 throw new RuntimeException("Topic ID " + topicIdPart.topicId() +
-                        " existed in isrMembers, but not in the topics map.");
+                    " existed in isrMembers, but not in the topics map.");
             }
             PartitionRegistration partition = topic.parts.get(topicIdPart.partitionId());
             if (partition == null) {
@@ -1674,7 +1717,7 @@ public class ReplicationControlManager {
         PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
             tp.topicId(),
             tp.partitionId(),
-            r -> clusterControl.unfenced(r),
+            clusterControl::active,
             featureControl.metadataVersion().isLeaderRecoverySupported());
         if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
             builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
@@ -1726,7 +1769,7 @@ public class ReplicationControlManager {
         PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
             tp.topicId(),
             tp.partitionId(),
-            r -> clusterControl.unfenced(r),
+            clusterControl::active,
             featureControl.metadataVersion().isLeaderRecoverySupported());
         if (!reassignment.merged().equals(currentReplicas)) {
             builder.setTargetReplicas(reassignment.merged());
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index 1c4d66b9e9..110e68fb89 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
@@ -90,22 +92,38 @@ public final class ClusterDelta {
     }
 
     public void replay(FenceBrokerRecord record) {
-        BrokerRegistration broker = getBrokerOrThrow(record.id(), record.epoch(), "fence");
-        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+        BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "fence");
+        changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
+            BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
+            Optional.empty()
+        )));
     }
 
     public void replay(UnfenceBrokerRecord record) {
-        BrokerRegistration broker = getBrokerOrThrow(record.id(), record.epoch(), "unfence");
-        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(false)));
+        BrokerRegistration curRegistration = getBrokerOrThrow(record.id(), record.epoch(), "unfence");
+        changedBrokers.put(record.id(), Optional.of(curRegistration.cloneWith(
+            BrokerRegistrationFencingChange.FENCE.asBoolean(),
+            Optional.empty()
+        )));
     }
 
     public void replay(BrokerRegistrationChangeRecord record) {
-        BrokerRegistration broker =
+        BrokerRegistration curRegistration =
             getBrokerOrThrow(record.brokerId(), record.brokerEpoch(), "change");
-        if (record.fenced() < 0) {
-            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(false)));
-        } else if (record.fenced() > 0) {
-            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(true)));
+        BrokerRegistrationFencingChange fencingChange =
+            BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(
+                () -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
+                    "value for fenced field: %d", record, record.fenced())));
+        BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
+            BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
+                () -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
+                    "value for inControlledShutdown field: %d", record, record.inControlledShutdown())));
+        BrokerRegistration nextRegistration = curRegistration.cloneWith(
+            fencingChange.asBoolean(),
+            inControlledShutdownChange.asBoolean()
+        );
+        if (!curRegistration.equals(nextRegistration)) {
+            changedBrokers.put(record.brokerId(), Optional.of(nextRegistration));
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
index 3cf36fa088..d513cbca35 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
@@ -19,6 +19,7 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -27,7 +28,6 @@ import java.util.Map;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-
 /**
  * Represents the cluster in the metadata image.
  *
@@ -54,10 +54,10 @@ public final class ClusterImage {
         return brokers.get(nodeId);
     }
 
-    public void write(Consumer<List<ApiMessageAndVersion>> out) {
+    public void write(Consumer<List<ApiMessageAndVersion>> out, MetadataVersion metadataVersion) {
         List<ApiMessageAndVersion> batch = new ArrayList<>();
         for (BrokerRegistration broker : brokers.values()) {
-            batch.add(broker.toRecord());
+            batch.add(broker.toRecord(metadataVersion));
         }
         out.accept(batch);
     }
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index 48bed5f8a9..e3cd94a0cb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -23,6 +23,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Consumer;
+import org.apache.kafka.server.common.MetadataVersion;
 
 
 /**
@@ -120,10 +121,16 @@ public final class MetadataImage {
     }
 
     public void write(Consumer<List<ApiMessageAndVersion>> out) {
+        // We use the minimum KRaft metadata version if this image does
+        // not have a specific version set.
+        MetadataVersion metadataVersion = features.metadataVersion();
+        if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
+            metadataVersion = MetadataVersion.IBP_3_0_IV1;
+        }
         // Features should be written out first so we can include the metadata.version at the beginning of the
         // snapshot
         features.write(out);
-        cluster.write(out);
+        cluster.write(out, metadataVersion);
         topics.write(out);
         configs.write(out);
         clientQuotas.write(out);
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index cc8ed9b4aa..d1d3455065 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -20,6 +20,7 @@ package org.apache.kafka.metadata;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
@@ -36,9 +37,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
-
-
 /**
  * An immutable class which represents broker registrations.
  */
@@ -58,6 +56,7 @@ public class BrokerRegistration {
     private final Map<String, VersionRange> supportedFeatures;
     private final Optional<String> rack;
     private final boolean fenced;
+    private final boolean inControlledShutdown;
 
     public BrokerRegistration(int id,
                               long epoch,
@@ -65,8 +64,10 @@ public class BrokerRegistration {
                               List<Endpoint> listeners,
                               Map<String, VersionRange> supportedFeatures,
                               Optional<String> rack,
-                              boolean fenced) {
-        this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack, fenced);
+                              boolean fenced,
+                              boolean inControlledShutdown) {
+        this(id, epoch, incarnationId, listenersToMap(listeners), supportedFeatures, rack,
+            fenced, inControlledShutdown);
     }
 
     public BrokerRegistration(int id,
@@ -75,7 +76,8 @@ public class BrokerRegistration {
                               Map<String, Endpoint> listeners,
                               Map<String, VersionRange> supportedFeatures,
                               Optional<String> rack,
-                              boolean fenced) {
+                              boolean fenced,
+                              boolean inControlledShutdown) {
         this.id = id;
         this.epoch = epoch;
         this.incarnationId = incarnationId;
@@ -92,6 +94,7 @@ public class BrokerRegistration {
         Objects.requireNonNull(rack);
         this.rack = rack;
         this.fenced = fenced;
+        this.inControlledShutdown = inControlledShutdown;
     }
 
     public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@@ -113,7 +116,8 @@ public class BrokerRegistration {
             listeners,
             supportedFeatures,
             Optional.ofNullable(record.rack()),
-            record.fenced());
+            record.fenced(),
+            record.inControlledShutdown());
     }
 
     public int id() {
@@ -152,13 +156,22 @@ public class BrokerRegistration {
         return fenced;
     }
 
-    public ApiMessageAndVersion toRecord() {
+    public boolean inControlledShutdown() {
+        return inControlledShutdown;
+    }
+
+    public ApiMessageAndVersion toRecord(MetadataVersion metadataVersion) {
         RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
             setBrokerId(id).
             setRack(rack.orElse(null)).
             setBrokerEpoch(epoch).
             setIncarnationId(incarnationId).
             setFenced(fenced);
+
+        if (metadataVersion.isInControlledShutdownStateSupported()) {
+            registrationRecord.setInControlledShutdown(inControlledShutdown);
+        }
+
         for (Entry<String, Endpoint> entry : listeners.entrySet()) {
             Endpoint endpoint = entry.getValue();
             registrationRecord.endPoints().add(new BrokerEndpoint().
@@ -167,20 +180,22 @@ public class BrokerRegistration {
                 setPort(endpoint.port()).
                 setSecurityProtocol(endpoint.securityProtocol().id));
         }
+
         for (Entry<String, VersionRange> entry : supportedFeatures.entrySet()) {
             registrationRecord.features().add(new BrokerFeature().
                 setName(entry.getKey()).
                 setMinSupportedVersion(entry.getValue().min()).
                 setMaxSupportedVersion(entry.getValue().max()));
         }
+
         return new ApiMessageAndVersion(registrationRecord,
-                REGISTER_BROKER_RECORD.highestSupportedVersion());
+            metadataVersion.registerBrokerRecordVersion());
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
-            rack, fenced);
+            rack, fenced, inControlledShutdown);
     }
 
     @Override
@@ -193,7 +208,8 @@ public class BrokerRegistration {
             other.listeners.equals(listeners) &&
             other.supportedFeatures.equals(supportedFeatures) &&
             other.rack.equals(rack) &&
-            other.fenced == fenced;
+            other.fenced == fenced &&
+            other.inControlledShutdown == inControlledShutdown;
     }
 
     @Override
@@ -213,12 +229,30 @@ public class BrokerRegistration {
         bld.append("}");
         bld.append(", rack=").append(rack);
         bld.append(", fenced=").append(fenced);
+        bld.append(", inControlledShutdown=").append(inControlledShutdown);
         bld.append(")");
         return bld.toString();
     }
 
-    public BrokerRegistration cloneWithFencing(boolean fencing) {
-        return new BrokerRegistration(id, epoch, incarnationId, listeners,
-            supportedFeatures, rack, fencing);
+    public BrokerRegistration cloneWith(
+        Optional<Boolean> fencingChange,
+        Optional<Boolean> inControlledShutdownChange
+    ) {
+        boolean newFenced = fencingChange.orElse(fenced);
+        boolean newInControlledShutdownChange = inControlledShutdownChange.orElse(inControlledShutdown);
+
+        if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown)
+            return this;
+
+        return new BrokerRegistration(
+            id,
+            epoch,
+            incarnationId,
+            listeners,
+            supportedFeatures,
+            rack,
+            newFenced,
+            newInControlledShutdownChange
+        );
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
new file mode 100644
index 0000000000..39f8abf595
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum BrokerRegistrationInControlledShutdownChange {
+    // Note that Optional.of(true) is not a valid state change here. The only
+    // way to leave the in controlled shutdown state is by registering the
+    // broker with a new incarnation id.
+    NONE(0, Optional.empty()),
+    IN_CONTROLLED_SHUTDOWN(1, Optional.of(true));
+
+    private final byte value;
+
+    private final Optional<Boolean> asBoolean;
+
+    private final static Map<Byte, BrokerRegistrationInControlledShutdownChange> VALUE_TO_ENUM =
+        Arrays.stream(BrokerRegistrationInControlledShutdownChange.values()).
+            collect(Collectors.toMap(v -> Byte.valueOf(v.value()), Function.identity()));
+
+    public static Optional<BrokerRegistrationInControlledShutdownChange> fromValue(byte value) {
+        return Optional.ofNullable(VALUE_TO_ENUM.get(value));
+    }
+
+    BrokerRegistrationInControlledShutdownChange(int value, Optional<Boolean> asBoolean) {
+        this.value = (byte) value;
+        this.asBoolean = asBoolean;
+    }
+
+    public Optional<Boolean> asBoolean() {
+        return asBoolean;
+    }
+
+    public byte value() {
+        return value;
+    }
+}
diff --git a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
index 152508ce54..81bebaaff2 100644
--- a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
+++ b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
@@ -17,7 +17,7 @@
   "apiKey": 17,
   "type": "metadata",
   "name": "BrokerRegistrationChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@@ -25,6 +25,8 @@
    { "name": "BrokerEpoch", "type": "int64", "versions": "0+",
      "about": "The broker epoch assigned by the controller." },
    { "name": "Fenced", "type": "int8", "versions": "0+", "taggedVersions": "0+", "tag": 0,
-     "about": "-1 if the broker has been unfenced, 0 if no change, 1 if the broker has been fenced." }
+     "about": "-1 if the broker has been unfenced, 0 if no change, 1 if the broker has been fenced." },
+   { "name": "InControlledShutdown", "type": "int8", "versions": "1+", "taggedVersions": "1+", "tag": 1,
+     "about": "0 if no change, 1 if the broker is in controlled shutdown." }
   ]
 }
diff --git a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
index a0e7af2fbe..a32c16d8a6 100644
--- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
@@ -17,7 +17,7 @@
   "apiKey": 0,
   "type": "metadata",
   "name": "RegisterBrokerRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@@ -49,6 +49,8 @@
     { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+",
       "about": "The broker rack." },
     { "name": "Fenced", "type": "bool", "versions": "0+", "default": "true",
-      "about": "True if the broker is fenced." }
+      "about": "True if the broker is fenced." },
+    { "name": "InControlledShutdown", "type": "bool", "versions": "1+", "default": "false",
+      "about": "True if the broker is in controlled shutdown." }
   ]
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 590578f6c6..cd44e2e678 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InconsistentClusterIdException;
@@ -40,6 +41,9 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.placement.ClusterDescriber;
@@ -69,11 +73,19 @@ public class ClusterControlManagerTest {
         MockTime time = new MockTime(0, 0, 0);
 
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(MetadataVersion.latest()).
+            build();
         ClusterControlManager clusterControl = new ClusterControlManager.Builder().
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
             setControllerMetrics(new MockControllerMetrics()).
+            setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
@@ -118,15 +130,134 @@ public class ClusterControlManagerTest {
         assertFalse(clusterControl.unfenced(1));
     }
 
+    @Test
+    public void testReplayRegisterBrokerRecord() {
+        MockTime time = new MockTime(0, 0, 0);
+
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(MetadataVersion.latest()).
+            build();
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+            setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
+            setTime(time).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(1000).
+            setControllerMetrics(new MockControllerMetrics()).
+            setFeatureControlManager(featureControl).
+            build();
+
+        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.inControlledShutdown(0));
+
+        RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
+            setBrokerEpoch(100).
+            setBrokerId(0).
+            setRack(null).
+            setFenced(true).
+            setInControlledShutdown(true);
+        brokerRecord.endPoints().add(new BrokerEndpoint().
+            setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
+            setPort((short) 9092).
+            setName("PLAINTEXT").
+            setHost("example.com"));
+        clusterControl.replay(brokerRecord);
+
+        assertFalse(clusterControl.unfenced(0));
+        assertTrue(clusterControl.inControlledShutdown(0));
+
+        brokerRecord.setInControlledShutdown(false);
+        clusterControl.replay(brokerRecord);
+
+        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.inControlledShutdown(0));
+
+        brokerRecord.setFenced(false);
+        clusterControl.replay(brokerRecord);
+
+        assertTrue(clusterControl.unfenced(0));
+        assertFalse(clusterControl.inControlledShutdown(0));
+    }
+
+    @Test
+    public void testReplayBrokerRegistrationChangeRecord() {
+        MockTime time = new MockTime(0, 0, 0);
+
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(MetadataVersion.latest()).
+            build();
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+            setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
+            setTime(time).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(1000).
+            setControllerMetrics(new MockControllerMetrics()).
+            setFeatureControlManager(featureControl).
+            build();
+
+        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.inControlledShutdown(0));
+
+        RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
+            setBrokerEpoch(100).
+            setBrokerId(0).
+            setRack(null).
+            setFenced(false);
+        brokerRecord.endPoints().add(new BrokerEndpoint().
+            setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
+            setPort((short) 9092).
+            setName("PLAINTEXT").
+            setHost("example.com"));
+        clusterControl.replay(brokerRecord);
+
+        assertTrue(clusterControl.unfenced(0));
+        assertFalse(clusterControl.inControlledShutdown(0));
+
+        BrokerRegistrationChangeRecord registrationChangeRecord = new BrokerRegistrationChangeRecord()
+            .setBrokerId(0)
+            .setBrokerEpoch(100)
+            .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
+        clusterControl.replay(registrationChangeRecord);
+
+        assertTrue(clusterControl.unfenced(0));
+        assertTrue(clusterControl.inControlledShutdown(0));
+
+        registrationChangeRecord = new BrokerRegistrationChangeRecord()
+            .setBrokerId(0)
+            .setBrokerEpoch(100)
+            .setFenced(BrokerRegistrationFencingChange.FENCE.value());
+        clusterControl.replay(registrationChangeRecord);
+
+        assertTrue(clusterControl.unfenced(0));
+        assertTrue(clusterControl.inControlledShutdown(0));
+    }
+
     @Test
     public void testRegistrationWithIncorrectClusterId() throws Exception {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(MetadataVersion.latest()).
+            build();
         ClusterControlManager clusterControl = new ClusterControlManager.Builder().
             setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
             setTime(new MockTime(0, 0, 0)).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
             setControllerMetrics(new MockControllerMetrics()).
+            setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
         assertThrows(InconsistentClusterIdException.class, () ->
@@ -139,6 +270,49 @@ public class ClusterControlManagerTest {
                 new FinalizedControllerFeatures(Collections.emptyMap(), 456L)));
     }
 
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", "IBP_3_3_IV3"})
+    public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(metadataVersion).
+            build();
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+            setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
+            setTime(new MockTime(0, 0, 0)).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(1000).
+            setControllerMetrics(new MockControllerMetrics()).
+            setFeatureControlManager(featureControl).
+            build();
+        clusterControl.activate();
+
+        ControllerResult<BrokerRegistrationReply> result = clusterControl.registerBroker(
+            new BrokerRegistrationRequestData().
+                setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
+                setBrokerId(0).
+                setRack(null).
+                setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+            123L,
+            new FinalizedControllerFeatures(Collections.emptyMap(), 456L));
+
+        short expectedVersion = metadataVersion.registerBrokerRecordVersion();
+
+        assertEquals(
+            Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
+                setBrokerEpoch(123L).
+                setBrokerId(0).
+                setRack(null).
+                setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")).
+                setFenced(true).
+                setInControlledShutdown(false), expectedVersion)),
+            result.records());
+    }
+
     @Test
     public void testUnregister() throws Exception {
         RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
@@ -152,19 +326,27 @@ public class ClusterControlManagerTest {
             setName("PLAINTEXT").
             setHost("example.com"));
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(MetadataVersion.latest()).
+            build();
         ClusterControlManager clusterControl = new ClusterControlManager.Builder().
             setTime(new MockTime(0, 0, 0)).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
             setControllerMetrics(new MockControllerMetrics()).
+            setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
         clusterControl.replay(brokerRecord);
         assertEquals(new BrokerRegistration(1, 100,
-            Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w"), Collections.singletonMap("PLAINTEXT",
-            new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 9092)),
-            Collections.emptyMap(), Optional.of("arack"), true),
-                clusterControl.brokerRegistrations().get(1));
+                Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w"), Collections.singletonMap("PLAINTEXT",
+                new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 9092)),
+                Collections.emptyMap(), Optional.of("arack"), true, false),
+            clusterControl.brokerRegistrations().get(1));
         UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord().
             setBrokerId(1).
             setBrokerEpoch(100);
@@ -177,11 +359,19 @@ public class ClusterControlManagerTest {
     public void testPlaceReplicas(int numUsableBrokers) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(MetadataVersion.latest()).
+            build();
         ClusterControlManager clusterControl = new ClusterControlManager.Builder().
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
             setControllerMetrics(new MockControllerMetrics()).
+            setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
         for (int i = 0; i < numUsableBrokers; i++) {
@@ -223,15 +413,24 @@ public class ClusterControlManagerTest {
         }
     }
 
-    @Test
-    public void testIterator() throws Exception {
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", "IBP_3_3_IV3"})
+    public void testIterator(MetadataVersion metadataVersion) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        FeatureControlManager featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(metadataVersion).
+            build();
         ClusterControlManager clusterControl = new ClusterControlManager.Builder().
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
             setControllerMetrics(new MockControllerMetrics()).
+            setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
@@ -250,6 +449,14 @@ public class ClusterControlManagerTest {
                 new UnfenceBrokerRecord().setId(i).setEpoch(100);
             clusterControl.replay(unfenceBrokerRecord);
         }
+        BrokerRegistrationChangeRecord registrationChangeRecord =
+            new BrokerRegistrationChangeRecord().
+                setBrokerId(0).
+                setBrokerEpoch(100).
+                setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.
+                    IN_CONTROLLED_SHUTDOWN.value());
+        clusterControl.replay(registrationChangeRecord);
+        short expectedVersion = metadataVersion.registerBrokerRecordVersion();
         RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
             Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerEpoch(100).setBrokerId(0).setRack(null).
@@ -258,7 +465,8 @@ public class ClusterControlManagerTest {
                         setPort((short) 9092).
                         setName("PLAINTEXT").
                         setHost("example.com")).iterator())).
-                setFenced(false), (short) 0)),
+                setInControlledShutdown(metadataVersion.isInControlledShutdownStateSupported()).
+                setFenced(false), expectedVersion)),
             Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerEpoch(100).setBrokerId(1).setRack(null).
                 setEndPoints(new BrokerEndpointCollection(Collections.singleton(
@@ -266,7 +474,7 @@ public class ClusterControlManagerTest {
                         setPort((short) 9093).
                         setName("PLAINTEXT").
                         setHost("example.com")).iterator())).
-                setFenced(false), (short) 0)),
+                setFenced(false), expectedVersion)),
             Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerEpoch(100).setBrokerId(2).setRack(null).
                 setEndPoints(new BrokerEndpointCollection(Collections.singleton(
@@ -274,7 +482,7 @@ public class ClusterControlManagerTest {
                         setPort((short) 9094).
                         setName("PLAINTEXT").
                         setHost("example.com")).iterator())).
-                setFenced(true), (short) 0))),
+                setFenced(true), expectedVersion))),
                 clusterControl.iterator(Long.MAX_VALUE));
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 284b8f7c16..ccdd3a5b23 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.controller;
 
+import java.util.Collections;
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
@@ -25,6 +27,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.ProducerIdsBlock;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.BeforeEach;
@@ -42,6 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class ProducerIdControlManagerTest {
 
     private SnapshotRegistry snapshotRegistry;
+    private FeatureControlManager featureControl;
     private ClusterControlManager clusterControl;
     private ProducerIdControlManager producerIdControlManager;
 
@@ -49,11 +53,19 @@ public class ProducerIdControlManagerTest {
     public void setUp() {
         final MockTime time = new MockTime();
         snapshotRegistry = new SnapshotRegistry(new LogContext());
+        featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(MetadataVersion.latest()).
+            build();
         clusterControl = new ClusterControlManager.Builder().
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
             setControllerMetrics(new MockControllerMetrics()).
+            setFeatureControlManager(featureControl).
             build();
 
         clusterControl.activate();
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 4e898b924c..87afbf4199 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -234,7 +234,7 @@ public class QuorumControllerTest {
 
             // Brokers are only registered and should still be fenced
             allBrokers.forEach(brokerId -> {
-                assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
+                assertFalse(active.clusterControl().unfenced(brokerId),
                     "Broker " + brokerId + " should have been fenced");
             });
 
@@ -254,7 +254,7 @@ public class QuorumControllerTest {
             TestUtils.waitForCondition(() -> {
                     sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
                     for (Integer brokerId : brokersToFence) {
-                        if (active.replicationControl().isBrokerUnfenced(brokerId)) {
+                        if (active.clusterControl().unfenced(brokerId)) {
                             return false;
                         }
                     }
@@ -268,11 +268,11 @@ public class QuorumControllerTest {
 
             // At this point only the brokers we want fenced should be fenced.
             brokersToKeepUnfenced.forEach(brokerId -> {
-                assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                assertTrue(active.clusterControl().unfenced(brokerId),
                     "Broker " + brokerId + " should have been unfenced");
             });
             brokersToFence.forEach(brokerId -> {
-                assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
+                assertFalse(active.clusterControl().unfenced(brokerId),
                     "Broker " + brokerId + " should have been fenced");
             });
 
@@ -326,7 +326,7 @@ public class QuorumControllerTest {
 
             // Brokers are only registered and should still be fenced
             allBrokers.forEach(brokerId -> {
-                assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
+                assertFalse(active.clusterControl().unfenced(brokerId),
                     "Broker " + brokerId + " should have been fenced");
             });
 
@@ -346,7 +346,7 @@ public class QuorumControllerTest {
                 () -> {
                     sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
                     for (Integer brokerId : brokersToFence) {
-                        if (active.replicationControl().isBrokerUnfenced(brokerId)) {
+                        if (active.clusterControl().unfenced(brokerId)) {
                             return false;
                         }
                     }
@@ -361,11 +361,11 @@ public class QuorumControllerTest {
 
             // At this point only the brokers we want fenced should be fenced.
             brokersToKeepUnfenced.forEach(brokerId -> {
-                assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                assertTrue(active.clusterControl().unfenced(brokerId),
                     "Broker " + brokerId + " should have been unfenced");
             });
             brokersToFence.forEach(brokerId -> {
-                assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
+                assertFalse(active.clusterControl().unfenced(brokerId),
                     "Broker " + brokerId + " should have been fenced");
             });
 
@@ -760,7 +760,7 @@ public class QuorumControllerTest {
                             new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                             setPort(9092).setSecurityProtocol((short) 0)).iterator())).
                 setRack(null).
-                setFenced(false), (short) 0),
+                setFenced(false), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB1")).
@@ -770,7 +770,7 @@ public class QuorumControllerTest {
                             new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                             setPort(9093).setSecurityProtocol((short) 0)).iterator())).
                 setRack(null).
-                setFenced(false), (short) 0),
+                setFenced(false), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB2")).
@@ -780,14 +780,14 @@ public class QuorumControllerTest {
                             new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                             setPort(9094).setSecurityProtocol((short) 0)).iterator())).
                 setRack(null).
-                setFenced(false), (short) 0),
+                setFenced(false), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB3")).
                 setEndPoints(new BrokerEndpointCollection(Arrays.asList(
                     new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                         setPort(9095).setSecurityProtocol((short) 0)).iterator())).
-                setRack(null), (short) 0),
+                setRack(null), (short) 1),
             new ApiMessageAndVersion(new ProducerIdsRecord().
                 setBrokerId(0).
                 setBrokerEpoch(brokerEpochs.get(0)).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index d7141a76dd..59b5488f6a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
@@ -54,6 +55,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.Lis
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
 import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
@@ -68,6 +70,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.controller.ReplicationControlManager.KRaftClusterDescriber;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.MockRandom;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -76,11 +79,13 @@ import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
 import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -109,6 +114,7 @@ import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED;
 import static org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
 import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS;
+import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICATION_FACTOR;
 import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT;
 import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
 import static org.apache.kafka.common.protocol.Errors.NONE;
@@ -139,6 +145,13 @@ public class ReplicationControlManagerTest {
         final MockTime time = new MockTime();
         final MockRandom random = new MockRandom();
         final ControllerMetrics metrics = new MockControllerMetrics();
+        final FeatureControlManager featureControl = new FeatureControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                QuorumFeatures.defaultFeatureMap(),
+                Collections.singletonList(0))).
+            setMetadataVersion(MetadataVersion.latest()).
+            build();
         final ClusterControlManager clusterControl = new ClusterControlManager.Builder().
             setLogContext(logContext).
             setTime(time).
@@ -146,6 +159,7 @@ public class ReplicationControlManagerTest {
             setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS)).
             setReplicaPlacer(new StripedReplicaPlacer(random)).
             setControllerMetrics(metrics).
+            setFeatureControlManager(featureControl).
             build();
         final ConfigurationControlManager configurationControl = new ConfigurationControlManager.Builder().
             setSnapshotRegistry(snapshotRegistry).
@@ -162,7 +176,23 @@ public class ReplicationControlManagerTest {
             this(Optional.empty());
         }
 
+        ReplicationControlTestContext(MetadataVersion metadataVersion) {
+            this(metadataVersion, Optional.empty());
+        }
+
         ReplicationControlTestContext(Optional<CreateTopicPolicy> createTopicPolicy) {
+            this(MetadataVersion.latest(), createTopicPolicy);
+        }
+
+        ReplicationControlTestContext(MetadataVersion metadataVersion, Optional<CreateTopicPolicy> createTopicPolicy) {
+            FeatureControlManager featureControl = new FeatureControlManager.Builder().
+                setSnapshotRegistry(snapshotRegistry).
+                setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
+                    QuorumFeatures.defaultFeatureMap(),
+                    Collections.singletonList(0))).
+                setMetadataVersion(metadataVersion).
+                build();
+
             this.replicationControl = new ReplicationControlManager.Builder().
                 setSnapshotRegistry(snapshotRegistry).
                 setLogContext(logContext).
@@ -171,6 +201,7 @@ public class ReplicationControlManagerTest {
                 setClusterControl(clusterControl).
                 setControllerMetrics(metrics).
                 setCreateTopicPolicy(createTopicPolicy).
+                setFeatureControl(featureControl).
                 build();
             clusterControl.activate();
         }
@@ -299,7 +330,7 @@ public class ReplicationControlManagerTest {
             replay(alterPartition.records());
         }
 
-        void unfenceBrokers(Integer... brokerIds)  throws Exception {
+        void unfenceBrokers(Integer... brokerIds) throws Exception {
             unfenceBrokers(Utils.mkSet(brokerIds));
         }
 
@@ -316,6 +347,20 @@ public class ReplicationControlManagerTest {
             }
         }
 
+        void inControlledShutdownBrokers(Integer... brokerIds) throws Exception {
+            inControlledShutdownBrokers(Utils.mkSet(brokerIds));
+        }
+
+        void inControlledShutdownBrokers(Set<Integer> brokerIds) throws Exception {
+            for (int brokerId : brokerIds) {
+                BrokerRegistrationChangeRecord record = new BrokerRegistrationChangeRecord()
+                    .setBrokerId(brokerId)
+                    .setBrokerEpoch(brokerId + 100)
+                    .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
+                replay(singletonList(new ApiMessageAndVersion(record, (short) 1)));
+            }
+        }
+
         void alterTopicConfig(
             String topic,
             String configKey,
@@ -401,38 +446,53 @@ public class ReplicationControlManagerTest {
         CreateTopicsRequestData request = new CreateTopicsRequestData();
         request.topics().add(new CreatableTopic().setName("foo").
             setNumPartitions(-1).setReplicationFactor((short) -1));
+
         ControllerResult<CreateTopicsResponseData> result =
             replicationControl.createTopics(request, Collections.singleton("foo"));
         CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
         expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
-            setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).
+            setErrorCode(INVALID_REPLICATION_FACTOR.code()).
                 setErrorMessage("Unable to replicate the partition 3 time(s): All " +
                     "brokers are currently fenced."));
         assertEquals(expectedResponse, result.response());
 
         ctx.registerBrokers(0, 1, 2);
-        ctx.unfenceBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0);
+        ctx.inControlledShutdownBrokers(0);
+
         ControllerResult<CreateTopicsResponseData> result2 =
             replicationControl.createTopics(request, Collections.singleton("foo"));
         CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
         expectedResponse2.topics().add(new CreatableTopicResult().setName("foo").
+            setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+            setErrorMessage("Unable to replicate the partition 3 time(s): All " +
+                "brokers are currently fenced or in controlled shutdown."));
+        assertEquals(expectedResponse2, result2.response());
+
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1, 2);
+
+        ControllerResult<CreateTopicsResponseData> result3 =
+            replicationControl.createTopics(request, Collections.singleton("foo"));
+        CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
+        expectedResponse3.topics().add(new CreatableTopicResult().setName("foo").
             setNumPartitions(1).setReplicationFactor((short) 3).
             setErrorMessage(null).setErrorCode((short) 0).
-            setTopicId(result2.response().topics().find("foo").topicId()));
-        assertEquals(expectedResponse2, result2.response());
-        ctx.replay(result2.records());
+            setTopicId(result3.response().topics().find("foo").topicId()));
+        assertEquals(expectedResponse3, result3.response());
+        ctx.replay(result3.records());
         assertEquals(new PartitionRegistration(new int[] {1, 2, 0},
             new int[] {1, 2, 0}, Replicas.NONE, Replicas.NONE, 1, LeaderRecoveryState.RECOVERED, 0, 0),
             replicationControl.getPartition(
-                ((TopicRecord) result2.records().get(0).message()).topicId(), 0));
-        ControllerResult<CreateTopicsResponseData> result3 =
+                ((TopicRecord) result3.records().get(0).message()).topicId(), 0));
+        ControllerResult<CreateTopicsResponseData> result4 =
                 replicationControl.createTopics(request, Collections.singleton("foo"));
-        CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
-        expectedResponse3.topics().add(new CreatableTopicResult().setName("foo").
+        CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData();
+        expectedResponse4.topics().add(new CreatableTopicResult().setName("foo").
                 setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
                 setErrorMessage("Topic 'foo' already exists."));
-        assertEquals(expectedResponse3, result3.response());
-        Uuid fooId = result2.response().topics().find("foo").topicId();
+        assertEquals(expectedResponse4, result4.response());
+        Uuid fooId = result3.response().topics().find("foo").topicId();
         RecordTestUtils.assertBatchIteratorContains(asList(
             asList(new ApiMessageAndVersion(new PartitionRecord().
                     setPartitionId(0).setTopicId(fooId).
@@ -444,6 +504,46 @@ public class ReplicationControlManagerTest {
             ctx.replicationControl.iterator(Long.MAX_VALUE));
     }
 
+    @Test
+    public void testCreateTopicsISRInvariants() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        request.topics().add(new CreatableTopic().setName("foo").
+            setNumPartitions(-1).setReplicationFactor((short) -1));
+
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1);
+        ctx.inControlledShutdownBrokers(1);
+
+        ControllerResult<CreateTopicsResponseData> result =
+            replicationControl.createTopics(request, Collections.singleton("foo"));
+
+        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+        expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
+            setNumPartitions(1).setReplicationFactor((short) 3).
+            setErrorMessage(null).setErrorCode((short) 0).
+            setTopicId(result.response().topics().find("foo").topicId()));
+        assertEquals(expectedResponse, result.response());
+
+        ctx.replay(result.records());
+
+        // Broker 2 cannot be in the ISR because it is fenced and broker 1
+        // cannot be in the ISR because it is in controlled shutdown.
+        assertEquals(
+            new PartitionRegistration(new int[]{1, 0, 2},
+                new int[]{0},
+                Replicas.NONE,
+                Replicas.NONE,
+                0,
+                LeaderRecoveryState.RECOVERED,
+                0,
+                0),
+            replicationControl.getPartition(
+                ((TopicRecord) result.records().get(0).message()).topicId(), 0));
+    }
+
     @Test
     public void testCreateTopicsWithConfigs() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@@ -557,7 +657,7 @@ public class ReplicationControlManagerTest {
         assertEquals(0, result.records().size());
         CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
         expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
-            setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()).
+            setErrorCode(INVALID_REPLICATION_FACTOR.code()).
             setErrorMessage("Unable to replicate the partition 4 time(s): The target " +
                 "replication factor of 4 cannot be reached because only 3 broker(s) " +
                 "are registered."));
@@ -1014,7 +1114,6 @@ public class ReplicationControlManagerTest {
         assertEmptyTopicConfigs(ctx, "foo");
     }
 
-
     @Test
     public void testCreatePartitions() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@@ -1100,6 +1199,79 @@ public class ReplicationControlManagerTest {
         ctx.replay(createPartitionsResult2.records());
     }
 
+    @Test
+    public void testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        request.topics().add(new CreatableTopic().setName("foo").
+            setNumPartitions(1).setReplicationFactor((short) 2));
+
+        ctx.registerBrokers(0, 1);
+        ctx.unfenceBrokers(0, 1);
+
+        ControllerResult<CreateTopicsResponseData> createTopicResult = replicationControl.
+            createTopics(request, new HashSet<>(Arrays.asList("foo")));
+        ctx.replay(createTopicResult.records());
+
+        ctx.registerBrokers(0, 1);
+        ctx.unfenceBrokers(0);
+        ctx.inControlledShutdownBrokers(0);
+
+        List<CreatePartitionsTopic> topics = new ArrayList<>();
+        topics.add(new CreatePartitionsTopic().
+            setName("foo").setCount(2).setAssignments(null));
+        ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
+            replicationControl.createPartitions(topics);
+
+        assertEquals(
+            asList(new CreatePartitionsTopicResult().
+                setName("foo").
+                setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+                setErrorMessage("Unable to replicate the partition 2 time(s): All " +
+                    "brokers are currently fenced or in controlled shutdown.")),
+            createPartitionsResult.response());
+    }
+
+    @Test
+    public void testCreatePartitionsISRInvariants() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        request.topics().add(new CreatableTopic().setName("foo").
+            setNumPartitions(1).setReplicationFactor((short) 3));
+
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1);
+        ctx.inControlledShutdownBrokers(1);
+
+        ControllerResult<CreateTopicsResponseData> result =
+            replicationControl.createTopics(request, Collections.singleton("foo"));
+        ctx.replay(result.records());
+
+        List<CreatePartitionsTopic> topics = asList(new CreatePartitionsTopic().
+            setName("foo").setCount(2).setAssignments(null));
+
+        ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
+            replicationControl.createPartitions(topics);
+        ctx.replay(createPartitionsResult.records());
+
+        // Broker 2 cannot be in the ISR because it is fenced and broker 1
+        // cannot be in the ISR because it is in controlled shutdown.
+        assertEquals(
+            new PartitionRegistration(new int[]{0, 1, 2},
+                new int[]{0},
+                Replicas.NONE,
+                Replicas.NONE,
+                0,
+                LeaderRecoveryState.RECOVERED,
+                0,
+                0),
+            replicationControl.getPartition(
+                ((TopicRecord) result.records().get(0).message()).topicId(), 1));
+    }
+
     @Test
     public void testValidateGoodManualPartitionAssignments() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@@ -1570,14 +1742,15 @@ public class ReplicationControlManagerTest {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
-        ctx.unfenceBrokers(2, 3, 4);
+        ctx.unfenceBrokers(1, 2, 3, 4);
+        ctx.inControlledShutdownBrokers(1);
         Uuid fooId = ctx.createTestTopic("foo", new int[][]{
             new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
         ElectLeadersRequestData request1 = new ElectLeadersRequestData().
             setElectionType(ElectionType.PREFERRED.value).
             setTopicPartitions(new TopicPartitionsCollection(asList(
                 new TopicPartitions().setTopic("foo").
-                    setPartitions(asList(0, 1)),
+                    setPartitions(asList(0, 1, 2)),
                 new TopicPartitions().setTopic("bar").
                     setPartitions(asList(0, 1))).iterator()));
         ControllerResult<ElectLeadersResponseData> election1Result =
@@ -1591,6 +1764,10 @@ public class ReplicationControlManagerTest {
                 new TopicPartition("foo", 1),
                 new ApiError(ELECTION_NOT_NEEDED)
             ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 2),
+                new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
+            ),
             Utils.mkEntry(
                 new TopicPartition("bar", 0),
                 new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
@@ -1602,14 +1779,21 @@ public class ReplicationControlManagerTest {
         ));
         assertElectLeadersResponse(expectedResponse1, election1Result.response());
         assertEquals(Collections.emptyList(), election1Result.records());
+
+        // Broker 1 must be registered to get out from the controlled shutdown state.
+        ctx.registerBrokers(1);
         ctx.unfenceBrokers(0, 1);
 
         ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
             new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
                 setTopics(asList(new AlterPartitionRequestData.TopicData().setName("foo").
-                    setPartitions(asList(new AlterPartitionRequestData.PartitionData().
-                        setPartitionIndex(0).setPartitionEpoch(0).
-                        setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)))))));
+                    setPartitions(asList(
+                        new AlterPartitionRequestData.PartitionData().
+                            setPartitionIndex(0).setPartitionEpoch(0).
+                            setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)),
+                        new AlterPartitionRequestData.PartitionData().
+                            setPartitionIndex(2).setPartitionEpoch(0).
+                            setLeaderEpoch(0).setNewIsr(asList(0, 2, 1)))))));
         assertEquals(new AlterPartitionResponseData().setTopics(asList(
             new AlterPartitionResponseData.TopicData().setName("foo").setPartitions(asList(
                 new AlterPartitionResponseData.PartitionData().
@@ -1618,6 +1802,13 @@ public class ReplicationControlManagerTest {
                     setLeaderEpoch(0).
                     setIsr(asList(1, 2, 3)).
                     setPartitionEpoch(1).
+                    setErrorCode(NONE.code()),
+                new AlterPartitionResponseData.PartitionData().
+                    setPartitionIndex(2).
+                    setLeaderId(2).
+                    setLeaderEpoch(0).
+                    setIsr(asList(0, 2, 1)).
+                    setPartitionEpoch(1).
                     setErrorCode(NONE.code()))))),
             alterPartitionResult.response());
 
@@ -1630,6 +1821,10 @@ public class ReplicationControlManagerTest {
                 new TopicPartition("foo", 1),
                 new ApiError(ELECTION_NOT_NEEDED)
             ),
+            Utils.mkEntry(
+                new TopicPartition("foo", 2),
+                ApiError.NONE
+            ),
             Utils.mkEntry(
                 new TopicPartition("bar", 0),
                 new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
@@ -1644,10 +1839,21 @@ public class ReplicationControlManagerTest {
         ControllerResult<ElectLeadersResponseData> election2Result =
             replication.electLeaders(request1);
         assertElectLeadersResponse(expectedResponse2, election2Result.response());
-        assertEquals(asList(new ApiMessageAndVersion(new PartitionChangeRecord().
-            setPartitionId(0).
-            setTopicId(fooId).
-            setLeader(1), (short) 0)), election2Result.records());
+        assertEquals(
+            asList(
+                new ApiMessageAndVersion(
+                    new PartitionChangeRecord().
+                        setPartitionId(0).
+                        setTopicId(fooId).
+                        setLeader(1),
+                    (short) 0),
+                new ApiMessageAndVersion(
+                    new PartitionChangeRecord().
+                        setPartitionId(2).
+                        setTopicId(fooId).
+                        setLeader(0),
+                    (short) 0)),
+            election2Result.records());
     }
 
     @Test
@@ -1797,4 +2003,45 @@ public class ReplicationControlManagerTest {
             new UsableBroker(3, Optional.empty(), false),
             new UsableBroker(4, Optional.empty(), false))), brokers);
     }
+
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", "IBP_3_3_IV3"})
+    public void testProcessBrokerHeartbeatInControlledShutdown(MetadataVersion metadataVersion) throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext(metadataVersion);
+        ctx.registerBrokers(0, 1, 2);
+        ctx.unfenceBrokers(0, 1, 2);
+
+        Uuid topicId = ctx.createTestTopic("foo", new int[][]{new int[]{0, 1, 2}}).topicId();
+
+        BrokerHeartbeatRequestData heartbeatRequest = new BrokerHeartbeatRequestData()
+            .setBrokerId(0)
+            .setBrokerEpoch(100)
+            .setCurrentMetadataOffset(0)
+            .setWantShutDown(true);
+
+        ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl
+            .processBrokerHeartbeat(heartbeatRequest, 0);
+
+        List<ApiMessageAndVersion> expectedRecords = new ArrayList<>();
+
+        if (metadataVersion.isInControlledShutdownStateSupported()) {
+            expectedRecords.add(new ApiMessageAndVersion(
+                new BrokerRegistrationChangeRecord()
+                    .setBrokerEpoch(100)
+                    .setBrokerId(0)
+                    .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange
+                        .IN_CONTROLLED_SHUTDOWN.value()),
+                (short) 1));
+        }
+
+        expectedRecords.add(new ApiMessageAndVersion(
+            new PartitionChangeRecord()
+                .setPartitionId(0)
+                .setTopicId(topicId)
+                .setIsr(asList(1, 2))
+                .setLeader(1),
+            (short) 0));
+
+        assertEquals(expectedRecords, result.records());
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
index e10d4a5971..59d5d2fed9 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
@@ -19,14 +19,17 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -62,13 +65,15 @@ public class ClusterImageTest {
             Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
             Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
             Optional.empty(),
-            true));
+            true,
+            false));
         map1.put(1, new BrokerRegistration(1,
             1001,
             Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"),
             Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
             Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
             Optional.empty(),
+            false,
             false));
         map1.put(2, new BrokerRegistration(2,
             123,
@@ -76,6 +81,7 @@ public class ClusterImageTest {
             Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
             Collections.emptyMap(),
             Optional.of("arack"),
+            false,
             false));
         IMAGE1 = new ClusterImage(map1);
 
@@ -84,6 +90,10 @@ public class ClusterImageTest {
             setId(0).setEpoch(1000), UNFENCE_BROKER_RECORD.highestSupportedVersion()));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FenceBrokerRecord().
             setId(1).setEpoch(1001), FENCE_BROKER_RECORD.highestSupportedVersion()));
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+            setBrokerId(0).setBrokerEpoch(1000).setInControlledShutdown(
+                BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
+            FENCE_BROKER_RECORD.highestSupportedVersion()));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
             setBrokerId(2).setBrokerEpoch(123),
             UNREGISTER_BROKER_RECORD.highestSupportedVersion()));
@@ -98,14 +108,16 @@ public class ClusterImageTest {
             Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
             Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
             Optional.empty(),
-            false));
+            false,
+            true));
         map2.put(1, new BrokerRegistration(1,
             1001,
             Uuid.fromString("U52uRe20RsGI0RvpcTx33Q"),
             Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093)),
             Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3)),
             Optional.empty(),
-            true));
+            true,
+            false));
         IMAGE2 = new ClusterImage(map2);
     }
 
@@ -131,7 +143,7 @@ public class ClusterImageTest {
 
     private void testToImageAndBack(ClusterImage image) throws Throwable {
         MockSnapshotConsumer writer = new MockSnapshotConsumer();
-        image.write(writer);
+        image.write(writer, MetadataVersion.latest());
         ClusterDelta delta = new ClusterDelta(ClusterImage.EMPTY);
         RecordTestUtils.replayAllBatches(delta, writer.batches());
         ClusterImage nextImage = delta.apply();
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index d9622b8e4c..10d1169412 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -32,8 +33,7 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
 @Timeout(value = 40)
 public class BrokerRegistrationTest {
@@ -41,15 +41,15 @@ public class BrokerRegistrationTest {
         new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"),
             Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)),
             Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2)),
-            Optional.empty(), false),
+            Optional.empty(), false, false),
         new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"),
             Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)),
             Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2)),
-            Optional.empty(), false),
+            Optional.empty(), true, false),
         new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"),
             Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
             Collections.singletonMap("foo", VersionRange.of((short) 2, (short) 3)),
-            Optional.of("myrack"), false));
+            Optional.of("myrack"), false, true));
 
     @Test
     public void testValues() {
@@ -60,13 +60,13 @@ public class BrokerRegistrationTest {
 
     @Test
     public void testEquals() {
-        assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(1)));
-        assertFalse(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(0)));
-        assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(2)));
-        assertFalse(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(0)));
-        assertTrue(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(0)));
-        assertTrue(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(1)));
-        assertTrue(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(2)));
+        assertNotEquals(REGISTRATIONS.get(0), REGISTRATIONS.get(1));
+        assertNotEquals(REGISTRATIONS.get(1), REGISTRATIONS.get(0));
+        assertNotEquals(REGISTRATIONS.get(0), REGISTRATIONS.get(2));
+        assertNotEquals(REGISTRATIONS.get(2), REGISTRATIONS.get(0));
+        assertEquals(REGISTRATIONS.get(0), REGISTRATIONS.get(0));
+        assertEquals(REGISTRATIONS.get(1), REGISTRATIONS.get(1));
+        assertEquals(REGISTRATIONS.get(2), REGISTRATIONS.get(2));
     }
 
     @Test
@@ -75,7 +75,7 @@ public class BrokerRegistrationTest {
             "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
-            "rack=Optional.empty, fenced=false)",
+            "rack=Optional.empty, fenced=true, inControlledShutdown=false)",
             REGISTRATIONS.get(1).toString());
     }
 
@@ -87,11 +87,11 @@ public class BrokerRegistrationTest {
     }
 
     private void testRoundTrip(BrokerRegistration registration) {
-        ApiMessageAndVersion messageAndVersion = registration.toRecord();
+        ApiMessageAndVersion messageAndVersion = registration.toRecord(MetadataVersion.latest());
         BrokerRegistration registration2 = BrokerRegistration.fromRecord(
             (RegisterBrokerRecord) messageAndVersion.message());
         assertEquals(registration, registration2);
-        ApiMessageAndVersion messageAndVersion2 = registration2.toRecord();
+        ApiMessageAndVersion messageAndVersion2 = registration2.toRecord(MetadataVersion.latest());
         assertEquals(messageAndVersion, messageAndVersion2);
     }
 
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index ee19faf88b..aa1a416c29 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -159,7 +159,10 @@ public enum MetadataVersion {
     IBP_3_3_IV1(6, "3.3", "IV1", true),
 
     // In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord.
-    IBP_3_3_IV2(7, "3.3", "IV2", true);
+    IBP_3_3_IV2(7, "3.3", "IV2", true),
+
+    // Adds InControlledShutdown state to RegisterBrokerRecord and BrokerRegistrationChangeRecord (KIP-841).
+    IBP_3_3_IV3(8, "3.3", "IV3", true);
 
     public static final String FEATURE_NAME = "metadata.version";
 
@@ -243,6 +246,18 @@ public enum MetadataVersion {
         return this.isAtLeast(IBP_3_3_IV2);
     }
 
+    public boolean isInControlledShutdownStateSupported() {
+        return this.isAtLeast(IBP_3_3_IV3);
+    }
+
+    public short registerBrokerRecordVersion() {
+        if (isInControlledShutdownStateSupported()) {
+            return (short) 1;
+        } else {
+            return (short) 0;
+        }
+    }
+
     private static final Map<String, MetadataVersion> IBP_VERSIONS;
     static {
         {
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 07a644e729..2f96d5fd04 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -18,9 +18,11 @@
 package org.apache.kafka.server.common;
 
 import org.apache.kafka.common.record.RecordVersion;
-import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV1;
@@ -62,6 +64,7 @@ import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV3;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -190,10 +193,11 @@ class MetadataVersionTest {
         assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
         assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2-IV0"));
 
-        assertEquals(IBP_3_3_IV2, MetadataVersion.fromVersionString("3.3"));
+        assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3"));
         assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3-IV0"));
         assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3-IV1"));
         assertEquals(IBP_3_3_IV2, MetadataVersion.fromVersionString("3.3-IV2"));
+        assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3-IV3"));
     }
 
     @Test
@@ -240,6 +244,7 @@ class MetadataVersionTest {
         assertEquals("3.3", IBP_3_3_IV0.shortVersion());
         assertEquals("3.3", IBP_3_3_IV1.shortVersion());
         assertEquals("3.3", IBP_3_3_IV2.shortVersion());
+        assertEquals("3.3", IBP_3_3_IV3.shortVersion());
     }
 
     @Test
@@ -275,6 +280,7 @@ class MetadataVersionTest {
         assertEquals("3.3-IV0", IBP_3_3_IV0.version());
         assertEquals("3.3-IV1", IBP_3_3_IV1.version());
         assertEquals("3.3-IV2", IBP_3_3_IV2.version());
+        assertEquals("3.3-IV3", IBP_3_3_IV3.version());
     }
 
     @Test
@@ -318,4 +324,19 @@ class MetadataVersionTest {
             }
         }
     }
+
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class)
+    public void testIsInControlledShutdownStateSupported(MetadataVersion metadataVersion) {
+        assertEquals(metadataVersion.isAtLeast(IBP_3_3_IV3),
+            metadataVersion.isInControlledShutdownStateSupported());
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class)
+    public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) {
+        short expectedVersion = metadataVersion.isAtLeast(IBP_3_3_IV3) ?
+            (short) 1 : (short) 0;
+        assertEquals(expectedVersion, metadataVersion.registerBrokerRecordVersion());
+    }
 }