You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2022/12/20 18:55:37 UTC

[kafka] branch trunk updated: KAFKA-14457; Controller metrics should only expose committed data (#12994)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 44b3177a087 KAFKA-14457; Controller metrics should only expose committed data (#12994)
44b3177a087 is described below

commit 44b3177a087ff809a9d95a27b63b10e00aa4da7d
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Tue Dec 20 10:55:14 2022 -0800

    KAFKA-14457; Controller metrics should only expose committed data (#12994)
    
    The controller metrics in the controllers has three problems. 1) the active controller exposes uncommitted data in the metrics. 2) the active controller doesn't update the metrics when the uncommitted data gets aborted. 3) the controller doesn't update the metrics when the entire state gets reset.
    
    We fix these issues by only updating the metrics when processing committed metadata records and reset the metrics when the metadata state is reset.
    
    This change adds a new type `ControllerMetricsManager` which processes committed metadata records and updates the metrics accordingly. This change also removes metrics updating responsibilities from the rest of the controller managers.
    
    Reviewers: Ron Dagostino <rd...@confluent.io>
---
 checkstyle/checkstyle.xml                          |   6 +
 .../org/apache/kafka/controller/BrokersToIsrs.java |  17 -
 .../kafka/controller/ClusterControlManager.java    |  62 +---
 .../apache/kafka/controller/ControllerMetrics.java |   4 +-
 .../kafka/controller/ControllerMetricsManager.java | 318 ++++++++++++++++++
 .../kafka/controller/PartitionChangeBuilder.java   |  20 +-
 .../apache/kafka/controller/QuorumController.java  |  18 +-
 .../kafka/controller/QuorumControllerMetrics.java  |   8 +-
 .../controller/ReplicationControlManager.java      |  62 +---
 .../controller/ClusterControlManagerTest.java      |  43 +--
 .../controller/ControllerMetricsManagerTest.java   | 364 +++++++++++++++++++++
 .../kafka/controller/MockControllerMetrics.java    |   4 +-
 .../controller/ProducerIdControlManagerTest.java   |   1 -
 .../kafka/controller/QuorumControllerTest.java     |  16 +-
 .../controller/ReplicationControlManagerTest.java  | 142 --------
 .../apache/kafka/timeline/SnapshotRegistry.java    |   2 +-
 16 files changed, 765 insertions(+), 322 deletions(-)

diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index d0599f3d7a3..bf2d339da8c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -143,9 +143,15 @@
       <!-- default is 200 -->
       <property name="max" value="500"/>
     </module>
+
+    <!-- Allows the use of the @SuppressWarnings annotation in the code -->
+    <module name="SuppressWarningsHolder"/>
   </module>
 
   <module name="SuppressionFilter">
     <property name="file" value="${suppressionsFile}"/>
   </module>
+
+  <!-- Allows the use of the @SuppressWarnings annotation in the code -->
+  <module name="SuppressWarningsFilter"/>
 </module>
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index ec48cbc57a6..5f72a109736 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
-import org.apache.kafka.timeline.TimelineInteger;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -105,12 +104,9 @@ public class BrokersToIsrs {
      */
     private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers;
     
-    private final TimelineInteger offlinePartitionCount;
-
     BrokersToIsrs(SnapshotRegistry snapshotRegistry) {
         this.snapshotRegistry = snapshotRegistry;
         this.isrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.offlinePartitionCount = new TimelineInteger(snapshotRegistry);
     }
 
     /**
@@ -131,9 +127,6 @@ public class BrokersToIsrs {
         } else {
             if (prevLeader == NO_LEADER) {
                 prev = Replicas.copyWith(prevIsr, NO_LEADER);
-                if (nextLeader != NO_LEADER) {
-                    offlinePartitionCount.decrement();
-                }
             } else {
                 prev = Replicas.clone(prevIsr);
             }
@@ -145,9 +138,6 @@ public class BrokersToIsrs {
         } else {
             if (nextLeader == NO_LEADER) {
                 next = Replicas.copyWith(nextIsr, NO_LEADER);
-                if (prevLeader != NO_LEADER) {
-                    offlinePartitionCount.increment();
-                }
             } else {
                 next = Replicas.clone(nextIsr);
             }
@@ -191,9 +181,6 @@ public class BrokersToIsrs {
     void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
         Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
         if (topicMap != null) {
-            if (brokerId == NO_LEADER && topicMap.containsKey(topicId)) {
-                offlinePartitionCount.set(offlinePartitionCount.get() - topicMap.get(topicId).length);
-            }
             topicMap.remove(topicId);
         }
     }
@@ -303,8 +290,4 @@ public class BrokersToIsrs {
     boolean hasLeaderships(int brokerId) {
         return iterator(brokerId, true).hasNext();
     }
-
-    int offlinePartitionCount() {
-        return offlinePartitionCount.get();
-    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index ba6c0e1d1c2..966e5c2137a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -83,7 +83,6 @@ public class ClusterControlManager {
         private SnapshotRegistry snapshotRegistry = null;
         private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
         private ReplicaPlacer replicaPlacer = null;
-        private ControllerMetrics controllerMetrics = null;
         private FeatureControlManager featureControl = null;
         private boolean zkMigrationEnabled = false;
 
@@ -117,11 +116,6 @@ public class ClusterControlManager {
             return this;
         }
 
-        Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
-            this.controllerMetrics = controllerMetrics;
-            return this;
-        }
-
         Builder setFeatureControlManager(FeatureControlManager featureControl) {
             this.featureControl = featureControl;
             return this;
@@ -145,9 +139,6 @@ public class ClusterControlManager {
             if (replicaPlacer == null) {
                 replicaPlacer = new StripedReplicaPlacer(new Random());
             }
-            if (controllerMetrics == null) {
-                throw new RuntimeException("You must specify ControllerMetrics");
-            }
             if (featureControl == null) {
                 throw new RuntimeException("You must specify FeatureControlManager");
             }
@@ -157,7 +148,6 @@ public class ClusterControlManager {
                 snapshotRegistry,
                 sessionTimeoutNs,
                 replicaPlacer,
-                controllerMetrics,
                 featureControl,
                 zkMigrationEnabled
             );
@@ -230,11 +220,6 @@ public class ClusterControlManager {
      */
     private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;
 
-    /**
-     * A reference to the controller's metrics registry.
-     */
-    private final ControllerMetrics controllerMetrics;
-
     /**
      * The broker heartbeat manager, or null if this controller is on standby.
      */
@@ -260,7 +245,6 @@ public class ClusterControlManager {
         SnapshotRegistry snapshotRegistry,
         long sessionTimeoutNs,
         ReplicaPlacer replicaPlacer,
-        ControllerMetrics metrics,
         FeatureControlManager featureControl,
         boolean zkMigrationEnabled
     ) {
@@ -274,7 +258,6 @@ public class ClusterControlManager {
         this.registerBrokerRecordOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
         this.heartbeatManager = null;
         this.readyBrokersFuture = Optional.empty();
-        this.controllerMetrics = metrics;
         this.featureControl = featureControl;
         this.zkMigrationEnabled = zkMigrationEnabled;
     }
@@ -415,8 +398,9 @@ public class ClusterControlManager {
     }
 
     public OptionalLong registerBrokerRecordOffset(int brokerId) {
-        if (registerBrokerRecordOffsets.containsKey(brokerId)) {
-            return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId));
+        Long registrationOffset = registerBrokerRecordOffsets.get(brokerId);
+        if (registrationOffset != null) {
+            return OptionalLong.of(registrationOffset);
         }
         return OptionalLong.empty();
     }
@@ -442,7 +426,6 @@ public class ClusterControlManager {
                     record.incarnationId(), listeners, features,
                     Optional.ofNullable(record.rack()), record.fenced(),
                     record.inControlledShutdown(), record.isMigratingZkBroker()));
-        updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
         if (heartbeatManager != null) {
             if (prevRegistration != null) heartbeatManager.remove(brokerId);
             heartbeatManager.register(brokerId, record.fenced());
@@ -469,7 +452,6 @@ public class ClusterControlManager {
         } else {
             if (heartbeatManager != null) heartbeatManager.remove(brokerId);
             brokerRegistrations.remove(brokerId);
-            updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Unregistered broker: {}", record);
         }
     }
@@ -498,11 +480,11 @@ public class ClusterControlManager {
         BrokerRegistrationFencingChange fencingChange =
             BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(
                 () -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
-                    "value for fenced field: %d", record, record.fenced())));
+                    "value for fenced field: %x", record, record.fenced())));
         BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
             BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
                 () -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
-                    "value for inControlledShutdown field: %d", record, record.inControlledShutdown())));
+                    "value for inControlledShutdown field: %x", record, record.inControlledShutdown())));
         replayRegistrationChange(
             record,
             record.brokerId(),
@@ -533,7 +515,6 @@ public class ClusterControlManager {
             );
             if (!curRegistration.equals(nextRegistration)) {
                 brokerRegistrations.put(brokerId, nextRegistration);
-                updateMetrics(curRegistration, nextRegistration);
             } else {
                 log.info("Ignoring no-op registration change for {}", curRegistration);
             }
@@ -547,35 +528,6 @@ public class ClusterControlManager {
         }
     }
 
-    private void updateMetrics(BrokerRegistration prevRegistration, BrokerRegistration registration) {
-        if (registration == null) {
-            if (prevRegistration.fenced()) {
-                controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
-            } else {
-                controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
-            }
-            log.info("Removed broker: {}", prevRegistration.id());
-        } else if (prevRegistration == null) {
-            if (registration.fenced()) {
-                controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
-                log.info("Added new fenced broker: {}", registration.id());
-            } else {
-                controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
-                log.info("Added new unfenced broker: {}", registration.id());
-            }
-        } else {
-            if (prevRegistration.fenced() && !registration.fenced()) {
-                controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
-                controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
-                log.info("Unfenced broker: {}", registration.id());
-            } else if (!prevRegistration.fenced() && registration.fenced()) {
-                controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
-                controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
-                log.info("Fenced broker: {}", registration.id());
-            }
-        }
-    }
-
     Iterator<UsableBroker> usableBrokers() {
         if (heartbeatManager == null) {
             throw new RuntimeException("ClusterControlManager is not active.");
@@ -588,7 +540,7 @@ public class ClusterControlManager {
      * Returns true if the broker is unfenced; Returns false if it is
      * not or if it does not exist.
      */
-    public boolean unfenced(int brokerId) {
+    public boolean isUnfenced(int brokerId) {
         BrokerRegistration registration = brokerRegistrations.get(brokerId);
         if (registration == null) return false;
         return !registration.fenced();
@@ -618,7 +570,7 @@ public class ClusterControlManager {
      * Returns true if the broker is active. Active means not fenced nor in controlled
      * shutdown; Returns false if it is not active or if it does not exist.
      */
-    public boolean active(int brokerId) {
+    public boolean isActive(int brokerId) {
         BrokerRegistration registration = brokerRegistrations.get(brokerId);
         if (registration == null) return false;
         return !registration.inControlledShutdown() && !registration.fenced();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index ff243aebfcb..baab5854004 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -35,9 +35,9 @@ public interface ControllerMetrics extends AutoCloseable {
 
     int activeBrokerCount();
 
-    void setGlobalTopicsCount(int topicCount);
+    void setGlobalTopicCount(int topicCount);
 
-    int globalTopicsCount();
+    int globalTopicCount();
 
     void setGlobalPartitionCount(int partitionCount);
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
new file mode 100644
index 00000000000..d034f42cdd4
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final int preferredReplica;
+
+        PartitionState(int leader, int preferredReplica) {
+            this.leader = leader;
+            this.preferredReplica = preferredReplica;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        int preferredReplica() {
+            return preferredReplica;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();
+        } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+            fencedBrokers.remove(brokerId);
+            updateBrokerStateMetrics();
+        } else {
+            // The fencingChange value is NONE. In this case the controller doesn't need to update the broker
+            // state metrics.
+        }
+    }
+
+    private void updateBrokerStateMetrics() {
+        controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
+
+        Set<Integer> activeBrokers = new HashSet<>(registeredBrokers);
+        activeBrokers.removeAll(fencedBrokers);
+        controllerMetrics.setActiveBrokerCount(activeBrokers.size());
+    }
+
+    private void replay(TopicRecord record) {
+        topicCount++;
+
+        controllerMetrics.setGlobalTopicCount(topicCount);
+    }
+
+    private void replay(PartitionRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+
+        PartitionState partitionState = new PartitionState(record.leader(), record.replicas().get(0));
+        topicPartitions.put(tp, partitionState);
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(PartitionChangeRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+        if (!topicPartitions.containsKey(tp)) {
+            throw new IllegalArgumentException(String.format("Unknown topic partitions %s", tp));
+        }
+
+        PartitionState partitionState = topicPartitions.computeIfPresent(
+            tp,
+            (key, oldValue) -> {
+                PartitionState newValue = oldValue;
+                // Update replicas
+                if (record.replicas() != null) {
+                    newValue = new PartitionState(newValue.leader(), record.replicas().get(0));
+                }
+
+                if (record.leader() != NO_LEADER_CHANGE) {
+                    newValue = new PartitionState(record.leader(), newValue.preferredReplica());
+                }
+
+                return newValue;
+            }
+        );
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(RemoveTopicRecord record) {
+        Uuid topicId = record.topicId();
+        Predicate<TopicIdPartition> matchesTopic = tp -> tp.topicId() == topicId;
+
+        topicCount--;
+        topicPartitions.keySet().removeIf(matchesTopic);
+        offlineTopicPartitions.removeIf(matchesTopic);
+        imbalancedTopicPartitions.removeIf(matchesTopic);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void updateBasedOnPartitionState(TopicIdPartition tp, PartitionState partitionState) {
+        if (partitionState.leader() == NO_LEADER) {
+            offlineTopicPartitions.add(tp);
+        } else {
+            offlineTopicPartitions.remove(tp);
+        }
+
+        if (partitionState.leader() == partitionState.preferredReplica()) {
+            imbalancedTopicPartitions.remove(tp);
+        } else {
+            imbalancedTopicPartitions.add(tp);
+        }
+    }
+
+    private void updateTopicAndPartitionMetrics() {
+        controllerMetrics.setGlobalTopicCount(topicCount);
+        controllerMetrics.setGlobalPartitionCount(topicPartitions.size());
+        controllerMetrics.setOfflinePartitionCount(offlineTopicPartitions.size());
+        controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedTopicPartitions.size());
+    }
+
+    /**
+     * Resets the value of all of the metrics.
+     *
+     * Resets all of the state tracked by this type and resets all of the related controller metrics.
+     */
+    void reset() {
+        registeredBrokers.clear();
+        fencedBrokers.clear();
+        topicCount = 0;
+        topicPartitions.clear();
+        offlineTopicPartitions.clear();
+        imbalancedTopicPartitions.clear();
+
+        updateBrokerStateMetrics();
+        updateTopicAndPartitionMetrics();
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 158609af181..2744af5360a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -17,6 +17,11 @@
 
 package org.apache.kafka.controller;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.IntPredicate;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.metadata.LeaderRecoveryState;
@@ -25,13 +30,6 @@ import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Function;
-
 import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
 import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
 
@@ -72,7 +70,7 @@ public class PartitionChangeBuilder {
     private final PartitionRegistration partition;
     private final Uuid topicId;
     private final int partitionId;
-    private final Function<Integer, Boolean> isAcceptableLeader;
+    private final IntPredicate isAcceptableLeader;
     private final boolean isLeaderRecoverySupported;
     private List<Integer> targetIsr;
     private List<Integer> targetReplicas;
@@ -84,7 +82,7 @@ public class PartitionChangeBuilder {
     public PartitionChangeBuilder(PartitionRegistration partition,
                                   Uuid topicId,
                                   int partitionId,
-                                  Function<Integer, Boolean> isAcceptableLeader,
+                                  IntPredicate isAcceptableLeader,
                                   boolean isLeaderRecoverySupported) {
         this.partition = partition;
         this.topicId = topicId;
@@ -197,7 +195,7 @@ public class PartitionChangeBuilder {
         if (election == Election.UNCLEAN) {
             // Attempt unclean leader election
             Optional<Integer> uncleanLeader = targetReplicas.stream()
-                .filter(replica -> isAcceptableLeader.apply(replica))
+                .filter(replica -> isAcceptableLeader.test(replica))
                 .findFirst();
             if (uncleanLeader.isPresent()) {
                 return new ElectionResult(uncleanLeader.get(), true);
@@ -208,7 +206,7 @@ public class PartitionChangeBuilder {
     }
 
     private boolean isValidNewLeader(int replica) {
-        return targetIsr.contains(replica) && isAcceptableLeader.apply(replica);
+        return targetIsr.contains(replica) && isAcceptableLeader.test(replica);
     }
 
     private void tryElection(PartitionChangeRecord record) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 642d6976dba..97e28299471 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -871,6 +871,9 @@ public final class QuorumController implements Controller {
                                 i++;
                             }
                         }
+
+                        controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
+
                         updateLastCommittedState(
                             offset,
                             epoch,
@@ -924,6 +927,7 @@ public final class QuorumController implements Controller {
                         for (ApiMessageAndVersion message : messages) {
                             try {
                                 replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
+                                controllerMetricsManager.replay(message.message());
                             } catch (Throwable e) {
                                 String failureMessage = String.format("Unable to apply %s record " +
                                         "from snapshot %s on standby controller, which was %d of " +
@@ -935,7 +939,9 @@ public final class QuorumController implements Controller {
                             i++;
                         }
                     }
-                    updateLastCommittedState(reader.lastContainedLogOffset(),
+
+                    updateLastCommittedState(
+                        reader.lastContainedLogOffset(),
                         reader.lastContainedLogEpoch(),
                         reader.lastContainedLogTimestamp());
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
@@ -1353,6 +1359,7 @@ public final class QuorumController implements Controller {
      */
     private void resetToEmptyState() {
         snapshotRegistry.reset();
+        controllerMetricsManager.reset();
 
         updateLastCommittedState(-1, -1, -1);
     }
@@ -1398,6 +1405,12 @@ public final class QuorumController implements Controller {
      */
     private final ControllerMetrics controllerMetrics;
 
+
+    /**
+     * Manager for updating controller metrics based on the committed metadata.
+     */
+    private final ControllerMetricsManager controllerMetricsManager;
+
     /**
      * A registry for snapshot data.  This must be accessed only by the event queue thread.
      */
@@ -1597,6 +1610,7 @@ public final class QuorumController implements Controller {
         this.queue = queue;
         this.time = time;
         this.controllerMetrics = controllerMetrics;
+        this.controllerMetricsManager = new ControllerMetricsManager(controllerMetrics);
         this.snapshotRegistry = new SnapshotRegistry(logContext);
         this.purgatory = new ControllerPurgatory();
         this.resourceExists = new ConfigResourceExistenceChecker();
@@ -1629,7 +1643,6 @@ public final class QuorumController implements Controller {
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(sessionTimeoutNs).
             setReplicaPlacer(replicaPlacer).
-            setControllerMetrics(controllerMetrics).
             setFeatureControlManager(featureControl).
             setZkMigrationEnabled(zkMigrationEnabled).
             build();
@@ -1644,7 +1657,6 @@ public final class QuorumController implements Controller {
             setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
             setConfigurationControl(configurationControl).
             setClusterControl(clusterControl).
-            setControllerMetrics(controllerMetrics).
             setCreateTopicPolicy(createTopicPolicy).
             setFeatureControl(featureControl).
             build();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index b96a687b0f3..00413c4bd53 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -49,7 +49,7 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
     private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName(
         "KafkaController", "PreferredReplicaImbalanceCount");
     private final static MetricName METADATA_ERROR_COUNT = getMetricName(
-            "KafkaController", "MetadataErrorCount");
+        "KafkaController", "MetadataErrorCount");
     private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
         "KafkaController", "LastAppliedRecordOffset");
     private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@@ -67,7 +67,7 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
     private volatile int globalPartitionCount;
     private volatile int offlinePartitionCount;
     private volatile int preferredReplicaImbalanceCount;
-    private volatile AtomicInteger metadataErrorCount;
+    private final AtomicInteger metadataErrorCount;
     private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
     private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
     private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
@@ -215,12 +215,12 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
     }
 
     @Override
-    public void setGlobalTopicsCount(int topicCount) {
+    public void setGlobalTopicCount(int topicCount) {
         this.globalTopicCount = topicCount;
     }
 
     @Override
-    public int globalTopicsCount() {
+    public int globalTopicCount() {
         return this.globalTopicCount;
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 5fd9af85e06..63fda45d541 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -94,7 +94,6 @@ import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
 import org.apache.kafka.timeline.TimelineHashSet;
-import org.apache.kafka.timeline.TimelineInteger;
 import org.slf4j.Logger;
 
 import java.util.AbstractMap.SimpleImmutableEntry;
@@ -111,7 +110,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Set;
-import java.util.function.Function;
+import java.util.function.IntPredicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -149,7 +148,6 @@ public class ReplicationControlManager {
         private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
         private ConfigurationControlManager configurationControl = null;
         private ClusterControlManager clusterControl = null;
-        private ControllerMetrics controllerMetrics = null;
         private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
         private FeatureControlManager featureControl = null;
 
@@ -188,11 +186,6 @@ public class ReplicationControlManager {
             return this;
         }
 
-        Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
-            this.controllerMetrics = controllerMetrics;
-            return this;
-        }
-
         Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
             this.createTopicPolicy = createTopicPolicy;
             return this;
@@ -208,8 +201,6 @@ public class ReplicationControlManager {
                 throw new IllegalStateException("Configuration control must be set before building");
             } else if (clusterControl == null) {
                 throw new IllegalStateException("Cluster controller must be set before building");
-            } else if (controllerMetrics == null) {
-                throw new IllegalStateException("Metrics must be set before building");
             }
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry();
@@ -230,7 +221,6 @@ public class ReplicationControlManager {
                 maxElectionsPerImbalance,
                 configurationControl,
                 clusterControl,
-                controllerMetrics,
                 createTopicPolicy,
                 featureControl);
         }
@@ -296,11 +286,6 @@ public class ReplicationControlManager {
      */
     private final int maxElectionsPerImbalance;
 
-    /**
-     * A count of the total number of partitions in the cluster.
-     */
-    private final TimelineInteger globalPartitionCount;
-
     /**
      * A reference to the controller's configuration control manager.
      */
@@ -311,11 +296,6 @@ public class ReplicationControlManager {
      */
     private final ClusterControlManager clusterControl;
 
-    /**
-     * A reference to the controller's metrics registry.
-     */
-    private final ControllerMetrics controllerMetrics;
-
     /**
      * The policy to use to validate that topic assignments are valid, if one is present.
      */
@@ -381,7 +361,6 @@ public class ReplicationControlManager {
         int maxElectionsPerImbalance,
         ConfigurationControlManager configurationControl,
         ClusterControlManager clusterControl,
-        ControllerMetrics controllerMetrics,
         Optional<CreateTopicPolicy> createTopicPolicy,
         FeatureControlManager featureControl
     ) {
@@ -391,11 +370,9 @@ public class ReplicationControlManager {
         this.defaultNumPartitions = defaultNumPartitions;
         this.maxElectionsPerImbalance = maxElectionsPerImbalance;
         this.configurationControl = configurationControl;
-        this.controllerMetrics = controllerMetrics;
         this.createTopicPolicy = createTopicPolicy;
         this.featureControl = featureControl;
         this.clusterControl = clusterControl;
-        this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
         this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
         this.topicsWithCollisionChars = new TimelineHashMap<>(snapshotRegistry, 0);
         this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -417,7 +394,6 @@ public class ReplicationControlManager {
         }
         topics.put(record.topicId(),
             new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
-        controllerMetrics.setGlobalTopicsCount(topics.size());
         log.info("Created topic {} with topic ID {}.", record.name(), record.topicId());
     }
 
@@ -436,8 +412,6 @@ public class ReplicationControlManager {
             topicInfo.parts.put(record.partitionId(), newPartInfo);
             brokersToIsrs.update(record.topicId(), record.partitionId(), null,
                 newPartInfo.isr, NO_LEADER, newPartInfo.leader);
-            globalPartitionCount.increment();
-            controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
             updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(),
                     false,  newPartInfo.isReassigning());
         } else if (!newPartInfo.equals(prevPartInfo)) {
@@ -454,9 +428,6 @@ public class ReplicationControlManager {
         } else {
             imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
         }
-
-        controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
-        controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
     }
 
     private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId,
@@ -505,9 +476,6 @@ public class ReplicationControlManager {
             imbalancedPartitions.add(new TopicIdPartition(record.topicId(), record.partitionId()));
         }
 
-        controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
-        controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
-
         if (record.removingReplicas() != null || record.addingReplicas() != null) {
             log.info("Replayed partition assignment change {} for topic {}", record, topicInfo.name);
         } else if (log.isTraceEnabled()) {
@@ -548,15 +516,9 @@ public class ReplicationControlManager {
             }
 
             imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), partitionId));
-
-            globalPartitionCount.decrement();
         }
         brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
 
-        controllerMetrics.setGlobalTopicsCount(topics.size());
-        controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
-        controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
-        controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
         log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
     }
 
@@ -659,7 +621,7 @@ public class ReplicationControlManager {
                 validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor);
                 replicationFactor = OptionalInt.of(assignment.brokerIds().size());
                 List<Integer> isr = assignment.brokerIds().stream().
-                    filter(clusterControl::active).collect(Collectors.toList());
+                    filter(clusterControl::isActive).collect(Collectors.toList());
                 if (isr.isEmpty()) {
                     return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
                         "All brokers specified in the manual partition assignment for " +
@@ -704,7 +666,7 @@ public class ReplicationControlManager {
                     PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId);
                     List<Integer> replicas = partitionAssignment.replicas();
                     List<Integer> isr = replicas.stream().
-                        filter(clusterControl::active).collect(Collectors.toList());
+                        filter(clusterControl::isActive).collect(Collectors.toList());
                     // If the ISR is empty, it means that all brokers are fenced or
                     // in controlled shutdown. To be consistent with the replica placer,
                     // we reject the create topic request with INVALID_REPLICATION_FACTOR.
@@ -983,7 +945,7 @@ public class ReplicationControlManager {
                     partition,
                     topic.id,
                     partitionId,
-                    clusterControl::active,
+                    clusterControl::isActive,
                     featureControl.metadataVersion().isLeaderRecoverySupported());
                 if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
                     builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
@@ -1359,7 +1321,7 @@ public class ReplicationControlManager {
         PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
             topicId,
             partitionId,
-            clusterControl::active,
+            clusterControl::isActive,
             featureControl.metadataVersion().isLeaderRecoverySupported());
         builder.setElection(election);
         Optional<ApiMessageAndVersion> record = builder.build();
@@ -1474,7 +1436,7 @@ public class ReplicationControlManager {
                 partition,
                 topicPartition.topicId(),
                 topicPartition.partitionId(),
-                clusterControl::active,
+                clusterControl::isActive,
                 featureControl.metadataVersion().isLeaderRecoverySupported()
             );
             builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
@@ -1557,7 +1519,7 @@ public class ReplicationControlManager {
                     OptionalInt.of(replicationFactor));
                 partitionAssignments.add(new PartitionAssignment(assignment.brokerIds()));
                 List<Integer> isr = assignment.brokerIds().stream().
-                    filter(clusterControl::active).collect(Collectors.toList());
+                    filter(clusterControl::isActive).collect(Collectors.toList());
                 if (isr.isEmpty()) {
                     throw new InvalidReplicaAssignmentException(
                         "All brokers specified in the manual partition assignment for " +
@@ -1577,7 +1539,7 @@ public class ReplicationControlManager {
             PartitionAssignment partitionAssignment = partitionAssignments.get(i);
             List<Integer> replicas = partitionAssignment.replicas();
             List<Integer> isr = isrs.get(i).stream().
-                filter(clusterControl::active).collect(Collectors.toList());
+                filter(clusterControl::isActive).collect(Collectors.toList());
             // If the ISR is empty, it means that all brokers are fenced or
             // in controlled shutdown. To be consistent with the replica placer,
             // we reject the create topic request with INVALID_REPLICATION_FACTOR.
@@ -1663,8 +1625,8 @@ public class ReplicationControlManager {
         // from the target ISR, but we need to exclude it here too, to handle the case
         // where there is an unclean leader election which chooses a leader from outside
         // the ISR.
-        Function<Integer, Boolean> isAcceptableLeader =
-            r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.active(r));
+        IntPredicate isAcceptableLeader =
+            r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r));
 
         while (iterator.hasNext()) {
             TopicIdPartition topicIdPart = iterator.next();
@@ -1791,7 +1753,7 @@ public class ReplicationControlManager {
         PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
             tp.topicId(),
             tp.partitionId(),
-            clusterControl::active,
+            clusterControl::isActive,
             featureControl.metadataVersion().isLeaderRecoverySupported());
         if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
             builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
@@ -1843,7 +1805,7 @@ public class ReplicationControlManager {
         PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
             tp.topicId(),
             tp.partitionId(),
-            clusterControl::active,
+            clusterControl::isActive,
             featureControl.metadataVersion().isLeaderRecoverySupported());
         if (!reassignment.merged().equals(currentReplicas)) {
             builder.setTargetReplicas(reassignment.merged());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 86918d5513c..c7dcbc5bfcd 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -84,11 +84,10 @@ public class ClusterControlManagerTest {
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
-            setControllerMetrics(new MockControllerMetrics()).
             setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
-        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.isUnfenced(0));
 
         RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(1);
         brokerRecord.endPoints().add(new BrokerEndpoint().
@@ -102,8 +101,8 @@ public class ClusterControlManagerTest {
             () -> clusterControl.checkBrokerEpoch(1, 101));
         assertThrows(StaleBrokerEpochException.class,
             () -> clusterControl.checkBrokerEpoch(2, 100));
-        assertFalse(clusterControl.unfenced(0));
-        assertFalse(clusterControl.unfenced(1));
+        assertFalse(clusterControl.isUnfenced(0));
+        assertFalse(clusterControl.isUnfenced(1));
 
         if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
             UnfenceBrokerRecord unfenceBrokerRecord =
@@ -114,8 +113,8 @@ public class ClusterControlManagerTest {
                     new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
             clusterControl.replay(changeRecord);
         }
-        assertFalse(clusterControl.unfenced(0));
-        assertTrue(clusterControl.unfenced(1));
+        assertFalse(clusterControl.isUnfenced(0));
+        assertTrue(clusterControl.isUnfenced(1));
 
         if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
             FenceBrokerRecord fenceBrokerRecord =
@@ -126,8 +125,8 @@ public class ClusterControlManagerTest {
                     new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced(BrokerRegistrationFencingChange.FENCE.value());
             clusterControl.replay(changeRecord);
         }
-        assertFalse(clusterControl.unfenced(0));
-        assertFalse(clusterControl.unfenced(1));
+        assertFalse(clusterControl.isUnfenced(0));
+        assertFalse(clusterControl.isUnfenced(1));
     }
 
     @Test
@@ -147,11 +146,10 @@ public class ClusterControlManagerTest {
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
-            setControllerMetrics(new MockControllerMetrics()).
             setFeatureControlManager(featureControl).
             build();
 
-        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.isUnfenced(0));
         assertFalse(clusterControl.inControlledShutdown(0));
 
         RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
@@ -167,20 +165,20 @@ public class ClusterControlManagerTest {
             setHost("example.com"));
         clusterControl.replay(brokerRecord, 100L);
 
-        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.isUnfenced(0));
         assertTrue(clusterControl.inControlledShutdown(0));
 
         brokerRecord.setInControlledShutdown(false);
         clusterControl.replay(brokerRecord, 100L);
 
-        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.isUnfenced(0));
         assertFalse(clusterControl.inControlledShutdown(0));
         assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
 
         brokerRecord.setFenced(false);
         clusterControl.replay(brokerRecord, 100L);
 
-        assertTrue(clusterControl.unfenced(0));
+        assertTrue(clusterControl.isUnfenced(0));
         assertFalse(clusterControl.inControlledShutdown(0));
     }
 
@@ -201,11 +199,10 @@ public class ClusterControlManagerTest {
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
-            setControllerMetrics(new MockControllerMetrics()).
             setFeatureControlManager(featureControl).
             build();
 
-        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.isUnfenced(0));
         assertFalse(clusterControl.inControlledShutdown(0));
 
         RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
@@ -220,7 +217,7 @@ public class ClusterControlManagerTest {
             setHost("example.com"));
         clusterControl.replay(brokerRecord, 100L);
 
-        assertTrue(clusterControl.unfenced(0));
+        assertTrue(clusterControl.isUnfenced(0));
         assertFalse(clusterControl.inControlledShutdown(0));
 
         BrokerRegistrationChangeRecord registrationChangeRecord = new BrokerRegistrationChangeRecord()
@@ -229,7 +226,7 @@ public class ClusterControlManagerTest {
             .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
         clusterControl.replay(registrationChangeRecord);
 
-        assertTrue(clusterControl.unfenced(0));
+        assertTrue(clusterControl.isUnfenced(0));
         assertTrue(clusterControl.inControlledShutdown(0));
 
         registrationChangeRecord = new BrokerRegistrationChangeRecord()
@@ -238,7 +235,7 @@ public class ClusterControlManagerTest {
             .setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
         clusterControl.replay(registrationChangeRecord);
 
-        assertTrue(clusterControl.unfenced(0));
+        assertTrue(clusterControl.isUnfenced(0));
         assertTrue(clusterControl.inControlledShutdown(0));
     }
 
@@ -257,7 +254,6 @@ public class ClusterControlManagerTest {
             setTime(new MockTime(0, 0, 0)).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
-            setControllerMetrics(new MockControllerMetrics()).
             setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
@@ -287,7 +283,6 @@ public class ClusterControlManagerTest {
             setTime(new MockTime(0, 0, 0)).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
-            setControllerMetrics(new MockControllerMetrics()).
             setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
@@ -343,7 +338,6 @@ public class ClusterControlManagerTest {
             setTime(new MockTime(0, 0, 0)).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
-            setControllerMetrics(new MockControllerMetrics()).
             setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
@@ -378,7 +372,6 @@ public class ClusterControlManagerTest {
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
-            setControllerMetrics(new MockControllerMetrics()).
             setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
@@ -397,7 +390,7 @@ public class ClusterControlManagerTest {
             clusterControl.heartbeatManager().touch(i, false, 0);
         }
         for (int i = 0; i < numUsableBrokers; i++) {
-            assertTrue(clusterControl.unfenced(i),
+            assertTrue(clusterControl.isUnfenced(i),
                 String.format("broker %d was not unfenced.", i));
         }
         for (int i = 0; i < 100; i++) {
@@ -432,11 +425,10 @@ public class ClusterControlManagerTest {
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
-            setControllerMetrics(new MockControllerMetrics()).
             setFeatureControlManager(featureControl).
             build();
         clusterControl.activate();
-        assertFalse(clusterControl.unfenced(0));
+        assertFalse(clusterControl.isUnfenced(0));
         for (int i = 0; i < 3; i++) {
             RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
                 setBrokerEpoch(100).setBrokerId(i).setRack(null);
@@ -511,7 +503,6 @@ public class ClusterControlManagerTest {
                 setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
                 setTime(new MockTime(0, 0, 0)).
                 setSnapshotRegistry(snapshotRegistry).
-                setControllerMetrics(new MockControllerMetrics()).
                 setFeatureControlManager(featureControl).
                 build();
         clusterControl.activate();
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java
new file mode 100644
index 00000000000..8cfeea417be
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+final class ControllerMetricsManagerTest {
+    @Test
+    public void testActiveBrokerRegistration() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(brokerRegistration(1, 1, false));
+        assertEquals(1, metrics.activeBrokerCount());
+        assertEquals(0, metrics.fencedBrokerCount());
+    }
+
+    @Test
+    public void testFenceBrokerRegistration() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(brokerRegistration(1, 1, true));
+        assertEquals(0, metrics.activeBrokerCount());
+        assertEquals(1, metrics.fencedBrokerCount());
+    }
+
+    @Test
+    public void testBrokerChangedToActive() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(brokerRegistration(1, 1, true));
+        manager.replay(brokerChange(1, 1, BrokerRegistrationFencingChange.UNFENCE));
+        assertEquals(1, metrics.activeBrokerCount());
+        assertEquals(0, metrics.fencedBrokerCount());
+    }
+
+    @Test
+    public void testBrokerLegacyChangedToActive() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(brokerRegistration(1, 1, true));
+        manager.replay(brokerUnfence(1, 1));
+        assertEquals(1, metrics.activeBrokerCount());
+        assertEquals(0, metrics.fencedBrokerCount());
+    }
+
+    @Test
+    public void testBrokerChangedToFence() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(brokerRegistration(1, 1, false));
+        manager.replay(brokerChange(1, 1, BrokerRegistrationFencingChange.FENCE));
+        assertEquals(0, metrics.activeBrokerCount());
+        assertEquals(1, metrics.fencedBrokerCount());
+    }
+
+
+    @Test
+    public void testBrokerLegacyChangedToFence() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(brokerRegistration(1, 1, false));
+        manager.replay(brokerFence(1, 1));
+        assertEquals(0, metrics.activeBrokerCount());
+        assertEquals(1, metrics.fencedBrokerCount());
+    }
+
+    @Test
+    public void testBrokerUnchanged() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(brokerRegistration(1, 1, true));
+        manager.replay(brokerChange(1, 1, BrokerRegistrationFencingChange.NONE));
+        assertEquals(0, metrics.activeBrokerCount());
+        assertEquals(1, metrics.fencedBrokerCount());
+    }
+
+    @Test
+    public void testBrokerUnregister() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(brokerRegistration(1, 1, true));
+        manager.replay(brokerRegistration(2, 1, false));
+        assertEquals(1, metrics.activeBrokerCount());
+        assertEquals(1, metrics.fencedBrokerCount());
+        manager.replay(brokerUnregistration(1, 1));
+        assertEquals(1, metrics.activeBrokerCount());
+        assertEquals(0, metrics.fencedBrokerCount());
+        manager.replay(brokerUnregistration(2, 1));
+        assertEquals(0, metrics.activeBrokerCount());
+        assertEquals(0, metrics.fencedBrokerCount());
+    }
+
+    @Test
+    public void testReplayBatch() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replayBatch(
+            0,
+            Arrays.asList(
+                new ApiMessageAndVersion(brokerRegistration(1, 1, true), (short) 0),
+                new ApiMessageAndVersion(brokerChange(1, 1, BrokerRegistrationFencingChange.UNFENCE), (short) 0)
+            )
+        );
+        assertEquals(1, metrics.activeBrokerCount());
+        assertEquals(0, metrics.fencedBrokerCount());
+    }
+
+    @Test
+    public void testTopicCountIncreased() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(topicRecord("test"));
+        assertEquals(1, metrics.globalTopicCount());
+    }
+
+    @Test
+    public void testTopicCountDecreased() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+        
+        Uuid id = Uuid.randomUuid();
+        manager.replay(topicRecord("test", id));
+        manager.replay(removeTopicRecord(id));
+        assertEquals(0, metrics.globalTopicCount());
+    }
+
+    @Test
+    public void testPartitionCountIncreased() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        Uuid id = Uuid.randomUuid();
+        manager.replay(topicRecord("test", id));
+        assertEquals(0, metrics.globalPartitionCount());
+        manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+        assertEquals(1, metrics.globalPartitionCount());
+        manager.replay(partitionRecord(id, 1, 0, Arrays.asList(0, 1, 2)));
+        assertEquals(2, metrics.globalPartitionCount());
+    }
+
+    @Test
+    public void testPartitionCountDecreased() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        Uuid id = Uuid.randomUuid();
+        manager.replay(topicRecord("test", id));
+        manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+        manager.replay(partitionRecord(id, 1, 0, Arrays.asList(0, 1, 2)));
+        manager.replay(removeTopicRecord(id));
+        assertEquals(0, metrics.globalPartitionCount());
+    }
+
+    @Test
+    public void testOfflinePartition() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        Uuid id = Uuid.randomUuid();
+        manager.replay(topicRecord("test", id));
+        manager.replay(partitionRecord(id, 0, NO_LEADER, Arrays.asList(0, 1, 2)));
+        assertEquals(1, metrics.offlinePartitionCount());
+    }
+
+    @Test
+    public void testImbalancedPartition() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        Uuid id = Uuid.randomUuid();
+        manager.replay(topicRecord("test", id));
+        manager.replay(partitionRecord(id, 0, 1, Arrays.asList(0, 1, 2)));
+        assertEquals(1, metrics.preferredReplicaImbalanceCount());
+    }
+
+    @Test
+    public void testPartitionChange() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        Uuid id = Uuid.randomUuid();
+        manager.replay(topicRecord("test", id));
+        manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+
+        manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(NO_LEADER), Optional.empty()));
+        assertEquals(1, metrics.offlinePartitionCount());
+
+        manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(1), Optional.empty()));
+        assertEquals(0, metrics.offlinePartitionCount());
+        assertEquals(1, metrics.preferredReplicaImbalanceCount());
+
+        manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(0), Optional.empty()));
+        assertEquals(0, metrics.preferredReplicaImbalanceCount());
+
+        manager.replay(partitionChangeRecord(id, 0, OptionalInt.empty(), Optional.of(Arrays.asList(1, 2, 0))));
+        assertEquals(1, metrics.preferredReplicaImbalanceCount());
+
+        manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(2), Optional.of(Arrays.asList(2, 0, 1))));
+        assertEquals(0, metrics.preferredReplicaImbalanceCount());
+    }
+
+    @Test
+    public void testStartingMetrics() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        assertEquals(0, metrics.activeBrokerCount());
+        assertEquals(0, metrics.fencedBrokerCount());
+        assertEquals(0, metrics.globalTopicCount());
+        assertEquals(0, metrics.globalPartitionCount());
+        assertEquals(0, metrics.offlinePartitionCount());
+        assertEquals(0, metrics.preferredReplicaImbalanceCount());
+    }
+
+    @Test
+    public void testReset() {
+        ControllerMetrics metrics = new MockControllerMetrics();
+        ControllerMetricsManager manager = new ControllerMetricsManager(metrics);
+
+        manager.replay(brokerRegistration(1, 1, true));
+
+        Uuid id = Uuid.randomUuid();
+        manager.replay(topicRecord("test", id));
+        manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+
+        manager.reset();
+
+        assertEquals(0, metrics.activeBrokerCount());
+        assertEquals(0, metrics.fencedBrokerCount());
+        assertEquals(0, metrics.globalTopicCount());
+        assertEquals(0, metrics.globalPartitionCount());
+        assertEquals(0, metrics.offlinePartitionCount());
+        assertEquals(0, metrics.preferredReplicaImbalanceCount());
+    }
+
+    private static RegisterBrokerRecord brokerRegistration(
+        int brokerId,
+        long epoch,
+        boolean fenced
+    ) {
+        return new RegisterBrokerRecord()
+            .setBrokerId(brokerId)
+            .setIncarnationId(Uuid.randomUuid())
+            .setBrokerEpoch(epoch)
+            .setFenced(fenced);
+    }
+
+    private static UnregisterBrokerRecord brokerUnregistration(
+        int brokerId,
+        long epoch
+    ) {
+        return new UnregisterBrokerRecord()
+            .setBrokerId(brokerId)
+            .setBrokerEpoch(epoch);
+    }
+
+    private static BrokerRegistrationChangeRecord brokerChange(
+        int brokerId,
+        long epoch,
+        BrokerRegistrationFencingChange fencing
+    ) {
+        return new BrokerRegistrationChangeRecord()
+            .setBrokerId(brokerId)
+            .setBrokerEpoch(epoch)
+            .setFenced(fencing.value());
+    }
+
+    private static UnfenceBrokerRecord brokerUnfence(int brokerId, long epoch) {
+        return new UnfenceBrokerRecord()
+            .setId(brokerId)
+            .setEpoch(epoch);
+    }
+
+    private static FenceBrokerRecord brokerFence(int brokerId, long epoch) {
+        return new FenceBrokerRecord()
+            .setId(brokerId)
+            .setEpoch(epoch);
+    }
+
+    private static TopicRecord topicRecord(String name) {
+        return new TopicRecord().setName(name).setTopicId(Uuid.randomUuid());
+    }
+
+    private static TopicRecord topicRecord(String name, Uuid id) {
+        return new TopicRecord().setName(name).setTopicId(id);
+    }
+
+    private static RemoveTopicRecord removeTopicRecord(Uuid id) {
+        return new RemoveTopicRecord().setTopicId(id);
+    }
+
+    private static PartitionRecord partitionRecord(
+        Uuid id,
+        int partition,
+        int leader,
+        List<Integer> replicas
+    ) {
+        return new PartitionRecord()
+            .setPartitionId(partition)
+            .setTopicId(id)
+            .setReplicas(replicas)
+            .setIsr(replicas)
+            .setLeader(leader);
+    }
+
+    private static PartitionChangeRecord partitionChangeRecord(
+        Uuid id,
+        int partition,
+        OptionalInt leader,
+        Optional<List<Integer>> replicas
+    ) {
+        PartitionChangeRecord record = new PartitionChangeRecord();
+        leader.ifPresent(record::setLeader);
+        replicas.ifPresent(record::setReplicas);
+        replicas.ifPresent(record::setIsr);
+
+        return record
+            .setPartitionId(partition)
+            .setTopicId(id);
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index ca13d90ddea..4a0155ed1bd 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -75,12 +75,12 @@ public final class MockControllerMetrics implements ControllerMetrics {
     }
 
     @Override
-    public void setGlobalTopicsCount(int topicCount) {
+    public void setGlobalTopicCount(int topicCount) {
         this.topics = topicCount;
     }
 
     @Override
-    public int globalTopicsCount() {
+    public int globalTopicCount() {
         return this.topics;
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index d0c16d1e3b0..b50083169e6 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -58,7 +58,6 @@ public class ProducerIdControlManagerTest {
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
-            setControllerMetrics(new MockControllerMetrics()).
             setFeatureControlManager(featureControl).
             build();
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 27c362e5438..3047e9bdc8e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -268,7 +268,7 @@ public class QuorumControllerTest {
 
             // Brokers are only registered and should still be fenced
             allBrokers.forEach(brokerId -> {
-                assertFalse(active.clusterControl().unfenced(brokerId),
+                assertFalse(active.clusterControl().isUnfenced(brokerId),
                     "Broker " + brokerId + " should have been fenced");
             });
 
@@ -288,7 +288,7 @@ public class QuorumControllerTest {
             TestUtils.waitForCondition(() -> {
                     sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
                     for (Integer brokerId : brokersToFence) {
-                        if (active.clusterControl().unfenced(brokerId)) {
+                        if (active.clusterControl().isUnfenced(brokerId)) {
                             return false;
                         }
                     }
@@ -302,11 +302,11 @@ public class QuorumControllerTest {
 
             // At this point only the brokers we want fenced should be fenced.
             brokersToKeepUnfenced.forEach(brokerId -> {
-                assertTrue(active.clusterControl().unfenced(brokerId),
+                assertTrue(active.clusterControl().isUnfenced(brokerId),
                     "Broker " + brokerId + " should have been unfenced");
             });
             brokersToFence.forEach(brokerId -> {
-                assertFalse(active.clusterControl().unfenced(brokerId),
+                assertFalse(active.clusterControl().isUnfenced(brokerId),
                     "Broker " + brokerId + " should have been fenced");
             });
 
@@ -367,7 +367,7 @@ public class QuorumControllerTest {
 
             // Brokers are only registered and should still be fenced
             allBrokers.forEach(brokerId -> {
-                assertFalse(active.clusterControl().unfenced(brokerId),
+                assertFalse(active.clusterControl().isUnfenced(brokerId),
                     "Broker " + brokerId + " should have been fenced");
             });
 
@@ -387,7 +387,7 @@ public class QuorumControllerTest {
                 () -> {
                     sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
                     for (Integer brokerId : brokersToFence) {
-                        if (active.clusterControl().unfenced(brokerId)) {
+                        if (active.clusterControl().isUnfenced(brokerId)) {
                             return false;
                         }
                     }
@@ -402,11 +402,11 @@ public class QuorumControllerTest {
 
             // At this point only the brokers we want fenced should be fenced.
             brokersToKeepUnfenced.forEach(brokerId -> {
-                assertTrue(active.clusterControl().unfenced(brokerId),
+                assertTrue(active.clusterControl().isUnfenced(brokerId),
                     "Broker " + brokerId + " should have been unfenced");
             });
             brokersToFence.forEach(brokerId -> {
-                assertFalse(active.clusterControl().unfenced(brokerId),
+                assertFalse(active.clusterControl().isUnfenced(brokerId),
                     "Broker " + brokerId + " should have been fenced");
             });
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 0997792cd90..d35cd6de2fe 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -151,7 +151,6 @@ public class ReplicationControlManagerTest {
         final LogContext logContext = new LogContext();
         final MockTime time = new MockTime();
         final MockRandom random = new MockRandom();
-        final ControllerMetrics metrics = new MockControllerMetrics();
         final FeatureControlManager featureControl = new FeatureControlManager.Builder().
             setSnapshotRegistry(snapshotRegistry).
             setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
@@ -165,7 +164,6 @@ public class ReplicationControlManagerTest {
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS)).
             setReplicaPlacer(new StripedReplicaPlacer(random)).
-            setControllerMetrics(metrics).
             setFeatureControlManager(featureControl).
             build();
         final ConfigurationControlManager configurationControl = new ConfigurationControlManager.Builder().
@@ -206,7 +204,6 @@ public class ReplicationControlManagerTest {
                 setMaxElectionsPerImbalance(Integer.MAX_VALUE).
                 setConfigurationControl(configurationControl).
                 setClusterControl(clusterControl).
-                setControllerMetrics(metrics).
                 setCreateTopicPolicy(createTopicPolicy).
                 setFeatureControl(featureControl).
                 build();
@@ -610,41 +607,6 @@ public class ReplicationControlManagerTest {
         );
     }
 
-    @Test
-    public void testBrokerCountMetrics() throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
-        ReplicationControlManager replicationControl = ctx.replicationControl;
-
-        ctx.registerBrokers(0);
-
-        assertEquals(1, ctx.metrics.fencedBrokerCount());
-        assertEquals(0, ctx.metrics.activeBrokerCount());
-
-        ctx.unfenceBrokers(0);
-
-        assertEquals(0, ctx.metrics.fencedBrokerCount());
-        assertEquals(1, ctx.metrics.activeBrokerCount());
-
-        ctx.registerBrokers(1);
-        ctx.unfenceBrokers(1);
-
-        assertEquals(2, ctx.metrics.activeBrokerCount());
-
-        ctx.registerBrokers(2);
-        ctx.unfenceBrokers(2);
-
-        assertEquals(0, ctx.metrics.fencedBrokerCount());
-        assertEquals(3, ctx.metrics.activeBrokerCount());
-
-        ControllerResult<Void> result = replicationControl.unregisterBroker(0);
-        ctx.replay(result.records());
-        result = replicationControl.unregisterBroker(2);
-        ctx.replay(result.records());
-
-        assertEquals(0, ctx.metrics.fencedBrokerCount());
-        assertEquals(1, ctx.metrics.activeBrokerCount());
-    }
-
     @Test
     public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@@ -718,107 +680,6 @@ public class ReplicationControlManagerTest {
         assertEquals(4, ctx.replicationControl.getTopic(recreatedTopic.topicId()).numPartitions(Long.MAX_VALUE));
     }
 
-    @Test
-    public void testGlobalTopicAndPartitionMetrics() throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
-        ReplicationControlManager replicationControl = ctx.replicationControl;
-        CreateTopicsRequestData request = new CreateTopicsRequestData();
-        request.topics().add(new CreatableTopic().setName("foo").
-            setNumPartitions(1).setReplicationFactor((short) -1));
-
-        ctx.registerBrokers(0, 1, 2);
-        ctx.unfenceBrokers(0, 1, 2);
-
-        List<Uuid> topicsToDelete = new ArrayList<>();
-
-        ControllerResult<CreateTopicsResponseData> result =
-            replicationControl.createTopics(request, Collections.singleton("foo"));
-        topicsToDelete.add(result.response().topics().find("foo").topicId());
-
-        RecordTestUtils.replayAll(replicationControl, result.records());
-        assertEquals(1, ctx.metrics.globalTopicsCount());
-
-        request = new CreateTopicsRequestData();
-        request.topics().add(new CreatableTopic().setName("bar").
-            setNumPartitions(1).setReplicationFactor((short) -1));
-        request.topics().add(new CreatableTopic().setName("baz").
-            setNumPartitions(2).setReplicationFactor((short) -1));
-        result = replicationControl.createTopics(request,
-            new HashSet<>(Arrays.asList("bar", "baz")));
-        RecordTestUtils.replayAll(replicationControl, result.records());
-        assertEquals(3, ctx.metrics.globalTopicsCount());
-        assertEquals(4, ctx.metrics.globalPartitionCount());
-
-        topicsToDelete.add(result.response().topics().find("baz").topicId());
-        ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.deleteTopics(topicsToDelete);
-        RecordTestUtils.replayAll(replicationControl, deleteResult.records());
-        assertEquals(1, ctx.metrics.globalTopicsCount());
-        assertEquals(1, ctx.metrics.globalPartitionCount());
-
-        Uuid topicToDelete = result.response().topics().find("bar").topicId();
-        deleteResult = replicationControl.deleteTopics(Collections.singletonList(topicToDelete));
-        RecordTestUtils.replayAll(replicationControl, deleteResult.records());
-        assertEquals(0, ctx.metrics.globalTopicsCount());
-        assertEquals(0, ctx.metrics.globalPartitionCount());
-    }
-
-    @Test
-    public void testOfflinePartitionAndReplicaImbalanceMetrics() throws Exception {
-        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
-        ReplicationControlManager replicationControl = ctx.replicationControl;
-        ctx.registerBrokers(0, 1, 2, 3);
-        ctx.unfenceBrokers(0, 1, 2, 3);
-
-        CreatableTopicResult foo = ctx.createTestTopic("foo", new int[][] {
-            new int[] {0, 2}, new int[] {0, 1}});
-
-        CreatableTopicResult zar = ctx.createTestTopic("zar", new int[][] {
-            new int[] {0, 1, 2}, new int[] {1, 2, 3}, new int[] {1, 2, 0}});
-
-        ControllerResult<Void> result = replicationControl.unregisterBroker(0);
-        ctx.replay(result.records());
-
-        // All partitions should still be online after unregistering broker 0
-        assertEquals(0, ctx.metrics.offlinePartitionCount());
-        // Three partitions should not have their preferred (first) replica 0
-        assertEquals(3, ctx.metrics.preferredReplicaImbalanceCount());
-
-        result = replicationControl.unregisterBroker(1);
-        ctx.replay(result.records());
-
-        // After unregistering broker 1, 1 partition for topic foo should go offline
-        assertEquals(1, ctx.metrics.offlinePartitionCount());
-        // All five partitions should not have their preferred (first) replica at this point
-        assertEquals(5, ctx.metrics.preferredReplicaImbalanceCount());
-
-        result = replicationControl.unregisterBroker(2);
-        ctx.replay(result.records());
-
-        // After unregistering broker 2, the last partition for topic foo should go offline
-        // and 2 partitions for topic zar should go offline
-        assertEquals(4, ctx.metrics.offlinePartitionCount());
-
-        result = replicationControl.unregisterBroker(3);
-        ctx.replay(result.records());
-
-        // After unregistering broker 3 the last partition for topic zar should go offline
-        assertEquals(5, ctx.metrics.offlinePartitionCount());
-
-        // Deleting topic foo should bring the offline partition count down to 3
-        ArrayList<ApiMessageAndVersion> records = new ArrayList<>();
-        replicationControl.deleteTopic(foo.topicId(), records);
-        ctx.replay(records);
-
-        assertEquals(3, ctx.metrics.offlinePartitionCount());
-
-        // Deleting topic zar should bring the offline partition count down to 0
-        records = new ArrayList<>();
-        replicationControl.deleteTopic(zar.topicId(), records);
-        ctx.replay(records);
-
-        assertEquals(0, ctx.metrics.offlinePartitionCount());
-    }
-
     @Test
     public void testValidateNewTopicNames() {
         Map<String, ApiError> topicErrors = new HashMap<>();
@@ -2092,7 +1953,6 @@ public class ReplicationControlManagerTest {
             new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
 
         assertTrue(replication.arePartitionLeadersImbalanced());
-        assertEquals(2, ctx.metrics.preferredReplicaImbalanceCount());
 
         ctx.unfenceBrokers(1);
 
@@ -2124,7 +1984,6 @@ public class ReplicationControlManagerTest {
             .setLeader(1);
         assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, (short) 0)), balanceResult.records());
         assertTrue(replication.arePartitionLeadersImbalanced());
-        assertEquals(1, ctx.metrics.preferredReplicaImbalanceCount());
         assertFalse(balanceResult.response());
 
         ctx.unfenceBrokers(0);
@@ -2157,7 +2016,6 @@ public class ReplicationControlManagerTest {
             .setLeader(0);
         assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, (short) 0)), balanceResult.records());
         assertFalse(replication.arePartitionLeadersImbalanced());
-        assertEquals(0, ctx.metrics.preferredReplicaImbalanceCount());
         assertFalse(balanceResult.response());
     }
 
diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 0df9c8b8f56..25177d45d71 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -264,7 +264,7 @@ public class SnapshotRegistry {
     }
 
     /**
-     * Associate with this registry.
+     * Associate a revertable with this registry.
      */
     public void register(Revertable revertable) {
         revertables.add(revertable);