You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/07/13 16:45:58 UTC

[kafka] branch KAFKA-15183 updated (fba82ef3bf7 -> 2816888c74b)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch KAFKA-15183
in repository https://gitbox.apache.org/repos/asf/kafka.git


    omit fba82ef3bf7 KAFKA-15183: Add more controller, loader, snapshot emitter metrics
     new 2816888c74b KAFKA-15183: Add more controller, loader, snapshot emitter metrics

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (fba82ef3bf7)
            \
             N -- N -- N   refs/heads/KAFKA-15183 (2816888c74b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 checkstyle/import-control-metadata.xml | 1 +
 1 file changed, 1 insertion(+)


[kafka] 01/01: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch KAFKA-15183
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2816888c74bc4f275f2abe94feaba7ad78b22599
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Wed Jul 12 11:26:03 2023 -0700

    KAFKA-15183: Add more controller, loader, snapshot emitter metrics
    
    Implement some of the metrics from KIP-938: Add more metrics for
    measuring KRaft performance.
    
    Add these metrics to QuorumControllerMetrics:
        kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
        kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
        kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
        kafka.controller:type=KafkaController,name=NewActiveControllersCount
    
    Create LoaderMetrics with these new metrics:
        kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
        kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
    
    Create SnapshotEmitterMetrics with these new metrics:
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs
---
 checkstyle/import-control-metadata.xml             |  16 +-
 .../src/main/scala/kafka/server/SharedServer.scala |  30 ++-
 .../server/metadata/BrokerServerMetrics.scala      |  11 +-
 .../apache/kafka/controller/QuorumController.java  |  54 ++++--
 .../metrics/QuorumControllerMetrics.java           |  83 +++++++-
 .../apache/kafka/image/loader/MetadataLoader.java  |  66 ++++---
 .../kafka/image/loader/MetadataLoaderMetrics.java  |  46 -----
 .../loader/metrics/MetadataLoaderMetrics.java      | 137 +++++++++++++
 .../publisher/metrics/SnapshotEmitterMetrics.java  | 108 +++++++++++
 .../QuorumControllerIntegrationTestUtils.java      | 213 +++++++++++++++++++++
 .../QuorumControllerMetricsIntegrationTest.java    | 202 +++++++++++++++++++
 .../kafka/controller/QuorumControllerTest.java     | 122 +-----------
 .../metrics/QuorumControllerMetricsTest.java       |   9 +-
 .../kafka/image/loader/MetadataLoaderTest.java     |  24 +++
 .../loader/metrics/MetadataLoaderMetricsTest.java  | 148 ++++++++++++++
 15 files changed, 1042 insertions(+), 227 deletions(-)

diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml
index 2cbb5504293..464006f0e38 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -63,7 +63,6 @@
     </subpackage>
 
     <subpackage name="controller">
-        <allow pkg="com.yammer.metrics"/>
         <allow pkg="org.apache.kafka.clients" />
         <allow pkg="org.apache.kafka.clients.admin" />
         <allow pkg="org.apache.kafka.common.acl" />
@@ -73,7 +72,6 @@
         <allow pkg="org.apache.kafka.common.internals" />
         <allow pkg="org.apache.kafka.common.message" />
         <allow pkg="org.apache.kafka.common.metadata" />
-        <allow pkg="org.apache.kafka.common.metrics" />
         <allow pkg="org.apache.kafka.common.network" />
         <allow pkg="org.apache.kafka.common.protocol" />
         <allow pkg="org.apache.kafka.common.quota" />
@@ -93,13 +91,17 @@
         <allow pkg="org.apache.kafka.server.common" />
         <allow pkg="org.apache.kafka.server.config" />
         <allow pkg="org.apache.kafka.server.fault" />
-        <allow pkg="org.apache.kafka.server.metrics" />
         <allow pkg="org.apache.kafka.server.mutable" />
         <allow pkg="org.apache.kafka.server.policy"/>
         <allow pkg="org.apache.kafka.server.util"/>
         <allow pkg="org.apache.kafka.snapshot" />
         <allow pkg="org.apache.kafka.test" />
         <allow pkg="org.apache.kafka.timeline" />
+        <subpackage name="metrics">
+            <allow pkg="com.yammer.metrics"/>
+            <allow pkg="org.apache.kafka.common.metrics" />
+            <allow pkg="org.apache.kafka.server.metrics" />
+        </subpackage>
     </subpackage>
 
     <subpackage name="image">
@@ -122,6 +124,14 @@
         <allow pkg="org.apache.kafka.server.util" />
         <allow pkg="org.apache.kafka.snapshot" />
         <allow pkg="org.apache.kafka.test" />
+        <subpackage name="loader">
+            <subpackage name="metrics">
+                <allow pkg="com.yammer.metrics"/>
+                <allow pkg="org.apache.kafka.common.metrics" />
+                <allow pkg="org.apache.kafka.controller.metrics" />
+                <allow pkg="org.apache.kafka.server.metrics" />
+            </subpackage>
+        </subpackage>
     </subpackage>
 
     <subpackage name="metadata">
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index e58b33a8d57..892c528885a 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -25,7 +25,9 @@ import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
+import org.apache.kafka.image.MetadataProvenance
 import org.apache.kafka.image.loader.MetadataLoader
+import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
 import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -34,7 +36,7 @@ import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, Process
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.util
-import java.util.{Collections, Optional}
+import java.util.Optional
 import java.util.concurrent.{CompletableFuture, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 
@@ -106,6 +108,7 @@ class SharedServer(
   val snapshotsDisabledReason = new AtomicReference[String](null)
   @volatile var snapshotEmitter: SnapshotEmitter = _
   @volatile var snapshotGenerator: SnapshotGenerator = _
+  @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _
 
   def isUsed(): Boolean = synchronized {
     usedByController || usedByBroker
@@ -259,15 +262,24 @@ class SharedServer(
         raftManager = _raftManager
         _raftManager.startup()
 
+        metadataLoaderMetrics = if (brokerMetrics != null) {
+          new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+            elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
+            batchSize => brokerMetrics.updateBatchSize(batchSize),
+            brokerMetrics.lastAppliedImageProvenance)
+        } else {
+          new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+            _ => {},
+            _ => {},
+            new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY))
+        }
         val loaderBuilder = new MetadataLoader.Builder().
           setNodeId(metaProps.nodeId).
           setTime(time).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           setFaultHandler(metadataLoaderFaultHandler).
-          setHighWaterMarkAccessor(() => _raftManager.client.highWatermark())
-        if (brokerMetrics != null) {
-          loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
-        }
+          setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
+          setMetrics(metadataLoaderMetrics)
         loader = loaderBuilder.build()
         snapshotEmitter = new SnapshotEmitter.Builder().
           setNodeId(metaProps.nodeId).
@@ -282,15 +294,15 @@ class SharedServer(
           setDisabledReason(snapshotsDisabledReason).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           build()
-        _raftManager.register(loader)
         try {
-          loader.installPublishers(Collections.singletonList(snapshotGenerator))
+          loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get()
         } catch {
           case t: Throwable => {
             error("Unable to install metadata publishers", t)
             throw new RuntimeException("Unable to install metadata publishers.", t)
           }
         }
+        _raftManager.register(loader)
         debug("Completed SharedServer startup.")
         started = true
       } catch {
@@ -326,6 +338,10 @@ class SharedServer(
         CoreUtils.swallow(loader.close(), this)
         loader = null
       }
+      if (metadataLoaderMetrics != null) {
+        CoreUtils.swallow(metadataLoaderMetrics.close(), this)
+        metadataLoaderMetrics = null
+      }
       if (snapshotGenerator != null) {
         CoreUtils.swallow(snapshotGenerator.close(), this)
         snapshotGenerator = null
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index 212909101f2..ff183324166 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -23,7 +23,6 @@ import org.apache.kafka.common.metrics.Gauge
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.MetricConfig
 import org.apache.kafka.image.MetadataProvenance
-import org.apache.kafka.image.loader.MetadataLoaderMetrics
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 
 import java.util.Collections
@@ -31,7 +30,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
 
 final class BrokerServerMetrics private (
   metrics: Metrics
-) extends MetadataLoaderMetrics {
+) extends AutoCloseable {
   import BrokerServerMetrics._
 
   private val batchProcessingTimeHistName = KafkaMetricsGroup.explicitMetricName("kafka.server",
@@ -123,15 +122,15 @@ final class BrokerServerMetrics private (
     ).foreach(metrics.removeMetric)
   }
 
-  override def updateBatchProcessingTime(elapsedNs: Long): Unit =
+  def updateBatchProcessingTime(elapsedNs: Long): Unit =
     batchProcessingTimeHist.update(NANOSECONDS.toMicros(elapsedNs))
 
-  override def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
+  def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
 
-  override def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
+  def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
     lastAppliedImageProvenance.set(provenance)
 
-  override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset()
+  def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset()
 
   def lastAppliedTimestamp(): Long = lastAppliedImageProvenance.get().lastContainedLogTimeMs()
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 582d774e6e9..2f32c3638df 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -461,6 +461,12 @@ public final class QuorumController implements Controller {
     private Throwable handleEventException(String name,
                                            OptionalLong startProcessingTimeNs,
                                            Throwable exception) {
+        if (!startProcessingTimeNs.isPresent() &&
+                ControllerExceptions.isTimeoutException(exception)) {
+            // If the event never started, and the exception is a timeout, increment the timed
+            // out metric.
+            controllerMetrics.incrementOperationsTimedOut();
+        }
         Throwable externalException =
                 ControllerExceptions.toExternalException(exception, () -> latestController());
         if (!startProcessingTimeNs.isPresent()) {
@@ -492,6 +498,15 @@ public final class QuorumController implements Controller {
         return externalException;
     }
 
+    private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) {
+        long now = time.nanoseconds();
+        controllerMetrics.incrementOperationsStarted();
+        if (eventCreatedTimeNs.isPresent()) {
+            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs.getAsLong()));
+        }
+        return now;
+    }
+
     /**
      * A controller event for handling internal state changes, such as Raft inputs.
      */
@@ -508,9 +523,8 @@ public final class QuorumController implements Controller {
 
         @Override
         public void run() throws Exception {
-            long now = time.nanoseconds();
-            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
-            startProcessingTimeNs = OptionalLong.of(now);
+            startProcessingTimeNs = OptionalLong.of(
+                updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
             log.debug("Executing {}.", this);
             handler.run();
             handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
@@ -527,11 +541,16 @@ public final class QuorumController implements Controller {
         }
     }
 
-    private void appendControlEvent(String name, Runnable handler) {
+    void appendControlEvent(String name, Runnable handler) {
         ControllerEvent event = new ControllerEvent(name, handler);
         queue.append(event);
     }
 
+    void appendControlEventWithDeadline(String name, Runnable handler, long deadlineNs) {
+        ControllerEvent event = new ControllerEvent(name, handler);
+        queue.appendWithDeadline(deadlineNs, event);
+    }
+
     /**
      * A controller event that reads the committed internal state in order to expose it
      * to an API.
@@ -555,9 +574,8 @@ public final class QuorumController implements Controller {
 
         @Override
         public void run() throws Exception {
-            long now = time.nanoseconds();
-            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
-            startProcessingTimeNs = OptionalLong.of(now);
+            startProcessingTimeNs = OptionalLong.of(
+                updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
             T value = handler.get();
             handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
             future.complete(value);
@@ -692,12 +710,11 @@ public final class QuorumController implements Controller {
 
         @Override
         public void run() throws Exception {
-            long now = time.nanoseconds();
-            if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
-                // We exclude deferred events from the event queue time metric to prevent
-                // incorrectly including the deferral time in the queue time.
-                controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
-            }
+            // Deferred events set the DOES_NOT_UPDATE_QUEUE_TIME flag to prevent incorrectly
+            // including their deferral time in the event queue time.
+            startProcessingTimeNs = OptionalLong.of(
+                updateEventStartMetricsAndGetTime(flags.contains(DOES_NOT_UPDATE_QUEUE_TIME) ?
+                    OptionalLong.empty() : OptionalLong.of(eventCreatedTimeNs)));
             int controllerEpoch = curClaimEpoch;
             if (!isActiveController(controllerEpoch)) {
                 throw ControllerExceptions.newWrongControllerException(latestController());
@@ -706,7 +723,6 @@ public final class QuorumController implements Controller {
                 log.info("Cannot run write operation {} in pre-migration mode. Returning NOT_CONTROLLER.", name);
                 throw ControllerExceptions.newPreMigrationException(latestController());
             }
-            startProcessingTimeNs = OptionalLong.of(now);
             ControllerResult<T> result = op.generateRecordsAndResult();
             if (result.records().isEmpty()) {
                 op.processBatchEndOffset(writeOffset);
@@ -1063,6 +1079,9 @@ public final class QuorumController implements Controller {
             appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", () -> {
                 final String newLeaderName = newLeader.leaderId().isPresent() ?
                         String.valueOf(newLeader.leaderId().getAsInt()) : "(none)";
+                if (newLeader.leaderId().isPresent()) {
+                    controllerMetrics.incrementNewActiveControllers();
+                }
                 if (isActiveController()) {
                     if (newLeader.isLeader(nodeId)) {
                         log.warn("We were the leader in epoch {}, and are still the leader " +
@@ -1308,7 +1327,7 @@ public final class QuorumController implements Controller {
         }
     }
 
-    private void renounce() {
+    void renounce() {
         try {
             if (curClaimEpoch == -1) {
                 throw new RuntimeException("Cannot renounce leadership because we are not the " +
@@ -2307,4 +2326,9 @@ public final class QuorumController implements Controller {
     Time time() {
         return time;
     }
+
+    // VisibleForTesting
+    QuorumControllerMetrics controllerMetrics() {
+        return controllerMetrics;
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 566245ed096..a95fa11d1c1 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -52,6 +52,14 @@ public class QuorumControllerMetrics implements AutoCloseable {
         "KafkaController", "LastAppliedRecordTimestamp");
     private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
         "KafkaController", "LastAppliedRecordLagMs");
+    private final static MetricName TIMED_OUT_BROKER_HEARTBEAT_COUNT = getMetricName(
+        "KafkaController", "TimedOutBrokerHeartbeatCount");
+    private final static MetricName EVENT_QUEUE_OPERATIONS_STARTED_COUNT = getMetricName(
+        "KafkaController", "EventQueueOperationsStartedCount");
+    private final static MetricName EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT = getMetricName(
+            "KafkaController", "EventQueueOperationsTimedOutCount");
+    private final static MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName(
+        "KafkaController", "NewActiveControllersCount");
 
     private final Optional<MetricsRegistry> registry;
     private volatile boolean active;
@@ -61,6 +69,9 @@ public class QuorumControllerMetrics implements AutoCloseable {
     private final Consumer<Long> eventQueueTimeUpdater;
     private final Consumer<Long> eventQueueProcessingTimeUpdater;
     private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
+    private final AtomicLong operationsStarted = new AtomicLong(0);
+    private final AtomicLong operationsTimedOut = new AtomicLong(0);
+    private final AtomicLong newActiveControllers = new AtomicLong(0);
 
     private Consumer<Long> newHistogram(MetricName name, boolean biased) {
         if (registry.isPresent()) {
@@ -109,6 +120,30 @@ public class QuorumControllerMetrics implements AutoCloseable {
                 return time.milliseconds() - lastAppliedRecordTimestamp();
             }
         }));
+        registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return timedOutHeartbeats();
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return timedOutHeartbeats();
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return timedOutHeartbeats();
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(NEW_ACTIVE_CONTROLLERS_COUNT, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return newActiveControllers();
+            }
+        }));
     }
 
     public void setActive(boolean active) {
@@ -152,17 +187,53 @@ public class QuorumControllerMetrics implements AutoCloseable {
     }
 
     public void incrementTimedOutHeartbeats() {
-        timedOutHeartbeats.addAndGet(1);
+        timedOutHeartbeats.incrementAndGet();
     }
 
-    public void setTimedOutHeartbeats(long heartbeats) {
-        timedOutHeartbeats.set(heartbeats);
+    public void setTimedOutHeartbeats(long value) {
+        timedOutHeartbeats.set(value);
     }
 
     public long timedOutHeartbeats() {
         return timedOutHeartbeats.get();
     }
 
+    public void incrementOperationsStarted() {
+        operationsStarted.incrementAndGet();
+    }
+
+    public void setOperationsStarted(long value) {
+        operationsStarted.set(value);
+    }
+
+    public long operationsStarted() {
+        return operationsStarted.get();
+    }
+
+    public void incrementOperationsTimedOut() {
+        operationsTimedOut.incrementAndGet();
+    }
+
+    public void setOperationsTimedOut(long value) {
+        operationsTimedOut.set(value);
+    }
+
+    public long operationsTimedOut() {
+        return operationsTimedOut.get();
+    }
+
+    public void incrementNewActiveControllers() {
+        newActiveControllers.incrementAndGet();
+    }
+
+    public void setNewActiveControllers(long value) {
+        newActiveControllers.set(value);
+    }
+
+    public long newActiveControllers() {
+        return newActiveControllers.get();
+    }
+
     @Override
     public void close() {
         registry.ifPresent(r -> Arrays.asList(
@@ -172,7 +243,11 @@ public class QuorumControllerMetrics implements AutoCloseable {
             LAST_APPLIED_RECORD_OFFSET,
             LAST_COMMITTED_RECORD_OFFSET,
             LAST_APPLIED_RECORD_TIMESTAMP,
-            LAST_APPLIED_RECORD_LAG_MS
+            LAST_APPLIED_RECORD_LAG_MS,
+            TIMED_OUT_BROKER_HEARTBEAT_COUNT,
+            EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
+            EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
+            NEW_ACTIVE_CONTROLLERS_COUNT
         ).forEach(r::removeMetric));
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 768fcb2574b..c2c066418b9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics;
 import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.image.writer.ImageReWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
@@ -35,14 +36,17 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.fault.FaultHandlerException;
 import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.snapshot.Snapshots;
 import org.slf4j.Logger;
 
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -69,28 +73,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
         private Time time = Time.SYSTEM;
         private LogContext logContext = null;
         private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e);
-        private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() {
-            private volatile long lastAppliedOffset = -1L;
-
-            @Override
-            public void updateBatchProcessingTime(long elapsedNs) { }
-
-            @Override
-            public void updateBatchSize(int size) { }
-
-            @Override
-            public void updateLastAppliedImageProvenance(MetadataProvenance provenance) {
-                this.lastAppliedOffset = provenance.lastContainedOffset();
-            }
-
-            @Override
-            public long lastAppliedOffset() {
-                return lastAppliedOffset;
-            }
-
-            @Override
-            public void close() throws Exception { }
-        };
+        private MetadataLoaderMetrics metrics = null;
         private Supplier<OptionalLong> highWaterMarkAccessor = null;
 
         public Builder setNodeId(int nodeId) {
@@ -113,13 +96,13 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
             return this;
         }
 
-        public Builder setMetadataLoaderMetrics(MetadataLoaderMetrics metrics) {
-            this.metrics = metrics;
+        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> highWaterMarkAccessor) {
+            this.highWaterMarkAccessor = highWaterMarkAccessor;
             return this;
         }
 
-        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> highWaterMarkAccessor) {
-            this.highWaterMarkAccessor = highWaterMarkAccessor;
+        public Builder setMetrics(MetadataLoaderMetrics metrics) {
+            this.metrics = metrics;
             return this;
         }
 
@@ -130,6 +113,12 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
             if (highWaterMarkAccessor == null) {
                 throw new RuntimeException("You must set the high water mark accessor.");
             }
+            if (metrics == null) {
+                metrics = new MetadataLoaderMetrics(Optional.empty(),
+                    __ -> { },
+                    __ -> { },
+                    new AtomicReference<>(MetadataProvenance.EMPTY));
+            }
             return new MetadataLoader(
                 time,
                 logContext,
@@ -221,6 +210,11 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                 new ShutdownEvent());
     }
 
+    // VisibleForTesting
+    MetadataLoaderMetrics metrics() {
+        return metrics;
+    }
+
     private boolean stillNeedToCatchUp(String where, long offset) {
         if (!catchingUp) {
             log.trace("{}: we are not in the initial catching up state.", where);
@@ -349,6 +343,9 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                     }
                 }
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                if (delta.featuresDelta() != null) {
+                    metrics.setCurrentMetadataVersion(image.features().metadataVersion());
+                }
                 if (uninitializedPublishers.isEmpty()) {
                     scheduleInitializeNewPublishers(0);
                 }
@@ -406,7 +403,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
         MetadataProvenance provenance =
                 new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs);
         long elapsedNs = time.nanoseconds() - startNs;
-        metrics.updateBatchProcessingTime(elapsedNs);
+        metrics.updateBatchProcessingTimeNs(elapsedNs);
         return new LogDeltaManifest(provenance,
                 currentLeaderAndEpoch,
                 numBatches,
@@ -418,24 +415,30 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
     public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
         eventQueue.append(() -> {
             try {
+                long numLoaded = metrics.incrementHandleLoadSnapshotCount();
+                String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
+                log.info("handleLoadSnapshot({}): incrementing HandleLoadSnapshotCount to {}.",
+                    snapshotName, numLoaded);
                 MetadataDelta delta = new MetadataDelta.Builder().
                         setImage(image).
                         build();
                 SnapshotManifest manifest = loadSnapshot(delta, reader);
-                log.info("handleLoadSnapshot: generated a metadata delta from a snapshot at offset {} " +
-                        "in {} us.", manifest.provenance().lastContainedOffset(),
+                log.info("handleLoadSnapshot({}): generated a metadata delta between offset {} " +
+                        "and this snapshot in {} us.", snapshotName,
+                        image.provenance().lastContainedOffset(),
                         NANOSECONDS.toMicros(manifest.elapsedNs()));
                 try {
                     image = delta.apply(manifest.provenance());
                 } catch (Throwable e) {
                     faultHandler.handleFault("Error generating new metadata image from " +
-                            "snapshot at offset " + reader.lastContainedLogOffset(), e);
+                            "snapshot " + snapshotName, e);
                     return;
                 }
                 if (stillNeedToCatchUp("handleLoadSnapshot", manifest.provenance().lastContainedOffset())) {
                     return;
                 }
-                log.info("handleLoadSnapshot: publishing new snapshot image with provenance {}.", image.provenance());
+                log.info("handleLoadSnapshot({}): publishing new snapshot image to {} publisher(s).",
+                        snapshotName, publishers.size());
                 for (MetadataPublisher publisher : publishers.values()) {
                     try {
                         publisher.onMetadataUpdate(delta, image, manifest);
@@ -446,6 +449,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>
                     }
                 }
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                metrics.setCurrentMetadataVersion(image.features().metadataVersion());
                 if (uninitializedPublishers.isEmpty()) {
                     scheduleInitializeNewPublishers(0);
                 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
deleted file mode 100644
index 654bc9dd505..00000000000
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.image.loader;
-
-import org.apache.kafka.image.MetadataProvenance;
-
-
-/**
- * An interface for the metadata loader metrics.
- */
-public interface MetadataLoaderMetrics extends AutoCloseable {
-    /**
-     * Update the batch processing time histogram.
-     */
-    void updateBatchProcessingTime(long elapsedNs);
-
-    /**
-     * Update the batch size histogram.
-     */
-    void updateBatchSize(int size);
-
-    /**
-     * Set the provenance of the last image which has been processed by all publishers.
-     */
-    void updateLastAppliedImageProvenance(MetadataProvenance provenance);
-
-    /**
-     * Retrieve the last offset which has been processed by all publishers.
-     */
-    long lastAppliedOffset();
-}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
new file mode 100644
index 00000000000..351ac4fc5c3
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * These are the metrics which are managed by the MetadataLoader class.
+ */
+public final class MetadataLoaderMetrics implements AutoCloseable {
+    private final static MetricName CURRENT_METADATA_VERSION = getMetricName(
+        "MetadataLoader", "CurrentMetadataVersion");
+    private final static MetricName HANDLE_LOAD_SNAPSHOT_COUNT = getMetricName(
+        "MetadataLoader", "HandleLoadSnapshotCount");
+
+    private final Optional<MetricsRegistry> registry;
+    private final AtomicReference<MetadataVersion> currentMetadataVersion =
+            new AtomicReference<>(MetadataVersion.MINIMUM_KRAFT_VERSION);
+    private final AtomicLong handleLoadSnapshotCount = new AtomicLong(0);
+    private final Consumer<Long> batchProcessingTimeNsUpdater;
+    private final Consumer<Integer> batchSizesUpdater;
+    private final AtomicReference<MetadataProvenance> lastAppliedProvenance;
+
+    /**
+     * Create a new LoaderMetrics object.
+     *
+     * @param registry                      The metrics registry, or Optional.empty if this is a
+     *                                      test and we don't have one.
+     * @param batchProcessingTimeNsUpdater  Updates the batch processing time histogram.
+     * @param batchSizesUpdater             Updates the batch sizes histogram.
+     */
+    public MetadataLoaderMetrics(
+        Optional<MetricsRegistry> registry,
+        Consumer<Long> batchProcessingTimeNsUpdater,
+        Consumer<Integer> batchSizesUpdater,
+        AtomicReference<MetadataProvenance> lastAppliedProvenance
+    ) {
+        this.registry = registry;
+        this.batchProcessingTimeNsUpdater = batchProcessingTimeNsUpdater;
+        this.batchSizesUpdater = batchSizesUpdater;
+        this.lastAppliedProvenance = lastAppliedProvenance;
+        registry.ifPresent(r -> r.newGauge(CURRENT_METADATA_VERSION, new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return Integer.valueOf(currentMetadataVersion().featureLevel());
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(HANDLE_LOAD_SNAPSHOT_COUNT, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return handleLoadSnapshotCount();
+            }
+        }));
+    }
+
+    /**
+     * Update the batch processing time histogram.
+     */
+    public void updateBatchProcessingTimeNs(long elapsedNs) {
+        batchProcessingTimeNsUpdater.accept(elapsedNs);
+    }
+
+    /**
+     * Update the batch size histogram.
+     */
+    public void updateBatchSize(int size) {
+        batchSizesUpdater.accept(size);
+    }
+
+    /**
+     * Set the provenance of the last image which has been processed by all publishers.
+     */
+    public void updateLastAppliedImageProvenance(MetadataProvenance lastAppliedProvenance) {
+        this.lastAppliedProvenance.set(lastAppliedProvenance);
+    }
+
+    /**
+     * Retrieve the last offset which has been processed by all publishers.
+     */
+    public long lastAppliedOffset() {
+        return this.lastAppliedProvenance.get().lastContainedOffset();
+    }
+
+    public void setCurrentMetadataVersion(MetadataVersion metadataVersion) {
+        this.currentMetadataVersion.set(metadataVersion);
+    }
+
+    public MetadataVersion currentMetadataVersion() {
+        return this.currentMetadataVersion.get();
+    }
+
+    public long incrementHandleLoadSnapshotCount() {
+        return this.handleLoadSnapshotCount.incrementAndGet();
+    }
+
+    public long handleLoadSnapshotCount() {
+        return this.handleLoadSnapshotCount.get();
+    }
+
+    @Override
+    public void close() {
+        registry.ifPresent(r -> Arrays.asList(
+            CURRENT_METADATA_VERSION,
+            HANDLE_LOAD_SNAPSHOT_COUNT
+        ).forEach(r::removeMetric));
+    }
+
+    private static MetricName getMetricName(String type, String name) {
+        return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
new file mode 100644
index 00000000000..0a2cd308463
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These are the metrics which are managed by the SnapshotEmitter class.
+ */
+public final class SnapshotEmitterMetrics implements AutoCloseable {
+    private final static MetricName LATEST_SNAPSHOT_GENERATED_BYTES = getMetricName(
+        "SnapshotEmitter", "LatestSnapshotGeneratedBytes");
+    private final static MetricName LATEST_SNAPSHOT_GENERATED_AGE_MS = getMetricName(
+        "SnapshotEmitter", "LatestSnapshotGeneratedAgeMs");
+
+    private final Optional<MetricsRegistry> registry;
+    private final Time time;
+    private final AtomicLong latestSnapshotGeneratedBytes;
+    private final AtomicLong latestSnapshotGeneratedTimeMs;
+
+    /**
+     * Create a new LoaderMetrics object.
+     *
+     * @param registry  The metrics registry, or Optional.empty if this is a test and we don't have one.
+     */
+    public SnapshotEmitterMetrics(
+        Optional<MetricsRegistry> registry,
+        Time time,
+        long initialLatestSnapshotGeneratedBytes
+    ) {
+        this.registry = registry;
+        this.time = time;
+        this.latestSnapshotGeneratedBytes = new AtomicLong(initialLatestSnapshotGeneratedBytes);
+        this.latestSnapshotGeneratedTimeMs = new AtomicLong(monoTimeInMs());
+        registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_BYTES, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return latestSnapshotGeneratedBytes();
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_AGE_MS, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return latestSnapshotGeneratedAgeMs();
+            }
+        }));
+    }
+
+    long monoTimeInMs() {
+        return TimeUnit.NANOSECONDS.toMillis(time.nanoseconds());
+    }
+
+    public void setLatestSnapshotGeneratedBytes(long value) {
+        this.latestSnapshotGeneratedBytes.set(value);
+    }
+
+    public long latestSnapshotGeneratedBytes() {
+        return this.latestSnapshotGeneratedBytes.get();
+    }
+
+    public void setLatestSnapshotGeneratedTimeMs(long timeMs) {
+        this.latestSnapshotGeneratedTimeMs.set(timeMs);
+    }
+
+    public long latestSnapshotGeneratedTimeMs() {
+        return this.latestSnapshotGeneratedTimeMs.get();
+    }
+
+    public long latestSnapshotGeneratedAgeMs() {
+        return time.milliseconds() - this.latestSnapshotGeneratedTimeMs.get();
+    }
+
+    @Override
+    public void close() {
+        registry.ifPresent(r -> Arrays.asList(
+            LATEST_SNAPSHOT_GENERATED_BYTES,
+            LATEST_SNAPSHOT_GENERATED_AGE_MS
+        ).forEach(r::removeMetric));
+    }
+
+    private static MetricName getMetricName(String type, String name) {
+        return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
new file mode 100644
index 00000000000..83758c0fff1
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+/**
+ * Utility functions for use in QuorumController integration tests.
+ */
+class QuorumControllerIntegrationTestUtils {
+    private final static Logger log = LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class);
+
+    BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
+        return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest());
+    }
+
+    /**
+     * Create a broker features collection for use in a registration request. We only set MV. here.
+     *
+     * @param minVersion    The minimum supported MV.
+     * @param maxVersion    The maximum supported MV.
+     */
+    static BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
+        MetadataVersion minVersion,
+        MetadataVersion maxVersion
+    ) {
+        BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
+        features.add(new BrokerRegistrationRequestData.Feature()
+                         .setName(MetadataVersion.FEATURE_NAME)
+                         .setMinSupportedVersion(minVersion.featureLevel())
+                         .setMaxSupportedVersion(maxVersion.featureLevel()));
+        return features;
+    }
+
+    /**
+     * Register the given number of brokers.
+     *
+     * @param controller    The active controller.
+     * @param numBrokers    The number of brokers to register. We will start at 0 and increment.
+     *
+     * @return              A map from broker IDs to broker epochs.
+     */
+    static Map<Integer, Long> registerBrokersAndUnfence(
+        QuorumController controller,
+        int numBrokers
+    ) throws Exception {
+        Map<Integer, Long> brokerEpochs = new HashMap<>();
+        for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
+            BrokerRegistrationReply reply = controller.registerBroker(ANONYMOUS_CONTEXT,
+                new BrokerRegistrationRequestData()
+                    .setBrokerId(brokerId)
+                    .setRack(null)
+                    .setClusterId(controller.clusterId())
+                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0))
+                    .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
+                    .setListeners(new ListenerCollection(
+                        Arrays.asList(
+                            new Listener()
+                                .setName("PLAINTEXT")
+                                .setHost("localhost")
+                                .setPort(9092 + brokerId)
+                            ).iterator()
+                        )
+                    )
+            ).get();
+            brokerEpochs.put(brokerId, reply.epoch());
+
+            // Send heartbeat to unfence
+            controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
+                new BrokerHeartbeatRequestData()
+                    .setWantFence(false)
+                    .setBrokerEpoch(brokerEpochs.get(brokerId))
+                    .setBrokerId(brokerId)
+                    .setCurrentMetadataOffset(100000L)
+            ).get();
+        }
+
+        return brokerEpochs;
+    }
+
+    /**
+     * Send broker heartbeats for the provided brokers.
+     *
+     * @param controller    The active controller.
+     * @param brokers       The broker IDs to send heartbeats for.
+     * @param brokerEpochs  A map from broker ID to broker epoch.
+     */
+    static void sendBrokerHeartbeat(
+        QuorumController controller,
+        List<Integer> brokers,
+        Map<Integer, Long> brokerEpochs
+    ) throws Exception {
+        if (brokers.isEmpty()) {
+            return;
+        }
+        for (Integer brokerId : brokers) {
+            BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
+                new BrokerHeartbeatRequestData()
+                    .setWantFence(false)
+                    .setBrokerEpoch(brokerEpochs.get(brokerId))
+                    .setBrokerId(brokerId)
+                    .setCurrentMetadataOffset(100000)
+            ).get();
+            assertEquals(new BrokerHeartbeatReply(true, false, false, false), reply);
+        }
+    }
+
+    /**
+     * Create some topics directly on the controller.
+     *
+     * @param controller            The active controller.
+     * @param prefix                The prefix to use for topic names.
+     * @param numTopics             The number of topics to create.
+     * @param replicationFactor     The replication factor to use.
+     */
+    static void createTopics(
+        QuorumController controller,
+        String prefix,
+        int numTopics,
+        int replicationFactor
+    ) throws Exception {
+        HashSet<String> describable = new HashSet<>();
+        for (int i = 0; i < numTopics; i++) {
+            describable.add(prefix + i);
+        }
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        for (int i = 0; i < numTopics; i++) {
+            request.topics().add(
+                new CreatableTopic().
+                    setName(prefix + i).
+                    setNumPartitions(-1).
+                    setReplicationFactor((short) replicationFactor));
+        }
+        CreateTopicsResponseData response =
+            controller.createTopics(ANONYMOUS_CONTEXT, request, describable).get();
+        for (int i = 0; i < numTopics; i++) {
+            CreatableTopicResult result = response.topics().find(prefix + i);
+            assertEquals((short) 0, result.errorCode());
+        }
+    }
+
+    /**
+     * Add an event to the controller event queue that will pause it temporarily.
+     *
+     * @param controller    The controller.
+     * @return              The latch that can be used to unpause the controller.
+     */
+    static CountDownLatch pause(QuorumController controller) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        controller.appendControlEvent("pause", () -> {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                log.info("Interrupted while waiting for unpause.", e);
+            }
+        });
+        return latch;
+    }
+
+    /**
+     * Force the current controller to renounce.
+     *
+     * @param controller    The controller.
+     */
+    static void forceRenounce(QuorumController controller) throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        controller.appendControlEvent("forceRenounce", () -> {
+            controller.renounce();
+            future.complete(null);
+        });
+        future.get();
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
new file mode 100644
index 00000000000..5cd6fe4b1b9
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metalog.LocalLogManagerTestEnv;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.forceRenounce;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class QuorumControllerMetricsIntegrationTest {
+    private final static Logger log = LoggerFactory.getLogger(QuorumControllerMetricsIntegrationTest.class);
+
+    static class MockControllerMetrics extends QuorumControllerMetrics {
+        final AtomicBoolean closed = new AtomicBoolean(false);
+
+        MockControllerMetrics() {
+            super(Optional.empty(), Time.SYSTEM);
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            closed.set(true);
+        }
+    }
+
+    /**
+     * Test that closing the QuorumController closes the metrics object.
+     */
+    @Test
+    public void testClosingQuorumControllerClosesMetrics() throws Throwable {
+        MockControllerMetrics metrics = new MockControllerMetrics();
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setMetrics(metrics);
+                }).
+                build()
+        ) {
+            assertEquals(1, controlEnv.activeController().controllerMetrics().newActiveControllers());
+        }
+        assertTrue(metrics.closed.get(), "metrics were not closed");
+    }
+
+    /**
+     * Test that failing over to a new controller increments NewActiveControllersCount on both the
+     * active and inactive controllers.
+     */
+    @Test
+    public void testFailingOverIncrementsNewActiveControllerCount() throws Throwable {
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                build()
+        ) {
+            controlEnv.activeController(); // wait for a controller to become active.
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                for (QuorumController controller : controlEnv.controllers()) {
+                    assertEquals(1, controller.controllerMetrics().newActiveControllers());
+                }
+            });
+            forceRenounce(controlEnv.activeController());
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                for (QuorumController controller : controlEnv.controllers()) {
+                    assertEquals(2, controller.controllerMetrics().newActiveControllers());
+                }
+            });
+        }
+    }
+
+    /**
+     * Test the heartbeat and general operation timeout metrics.
+     * These are incremented on the active controller only.
+     */
+    @Test
+    public void testTimeoutMetrics() throws Throwable {
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                build()
+        ) {
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = registerBrokersAndUnfence(active, 3);
+            assertEquals(0L, active.controllerMetrics().timedOutHeartbeats());
+            assertEquals(0L, active.controllerMetrics().operationsTimedOut());
+
+            // We pause the controller so that the heartbeat event will definitely be expired
+            // rather than processed.
+            CountDownLatch latch = pause(active);
+            ControllerRequestContext expiredTimeoutContext = new ControllerRequestContext(
+                new RequestHeaderData(),
+                KafkaPrincipal.ANONYMOUS,
+                OptionalLong.of(active.time().nanoseconds()));
+            CompletableFuture<BrokerHeartbeatReply> replyFuture =
+                active.processBrokerHeartbeat(expiredTimeoutContext,
+                    new BrokerHeartbeatRequestData()
+                        .setWantFence(false)
+                        .setBrokerEpoch(brokerEpochs.get(0))
+                        .setBrokerId(0)
+                        .setCurrentMetadataOffset(100000));
+            latch.countDown(); // Unpause the controller.
+            assertEquals(TimeoutException.class,
+                assertThrows(ExecutionException.class, () -> replyFuture.get()).
+                    getCause().getClass());
+            assertEquals(1L, active.controllerMetrics().timedOutHeartbeats());
+            assertEquals(1L, active.controllerMetrics().operationsTimedOut());
+
+            // Inject a new timed out operation.
+            CountDownLatch latch2 = pause(active);
+            active.appendControlEventWithDeadline("fakeTimeoutOperation",
+                () -> { },
+                active.time().nanoseconds());
+            latch2.countDown();
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                // The fake timeout increments operationsTimedOut but not timedOutHeartbeats.
+                assertEquals(1L, active.controllerMetrics().timedOutHeartbeats());
+                assertEquals(2L, active.controllerMetrics().operationsTimedOut());
+            });
+            for (QuorumController controller : controlEnv.controllers()) {
+                // Inactive controllers don't set these metrics.
+                if (!controller.isActive()) {
+                    assertEquals(false, controller.controllerMetrics().active());
+                    assertEquals(0L, controller.controllerMetrics().timedOutHeartbeats());
+                    assertEquals(0L, controller.controllerMetrics().operationsTimedOut());
+                }
+            }
+        }
+    }
+
+    /**
+     * Test the event queue operations started metric.
+     */
+    @Test
+    public void testEventQueueOperationsStartedMetric() throws Throwable {
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
+                                                build();
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
+                                                     build()
+        ) {
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = registerBrokersAndUnfence(active, 3);
+
+            // Test that a new operation increments operationsStarted. We retry this if needed
+            // to handle the case where another operation is performed in between loading
+            // expectedOperationsStarted and running the new control event.
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                long expectedOperationsStarted = active.controllerMetrics().operationsStarted() + 1;
+                CompletableFuture<Long> actualOperationsStarted = new CompletableFuture<>();
+                active.appendControlEvent("checkOperationsStarted", () -> {
+                    actualOperationsStarted.complete(active.controllerMetrics().operationsStarted());
+                });
+                assertEquals(expectedOperationsStarted, actualOperationsStarted.get());
+            });
+        }
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index d55ff5f67c2..261d3c91b28 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -88,7 +87,6 @@ import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
-import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
@@ -124,6 +122,10 @@ import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
 import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
+import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -140,39 +142,6 @@ public class QuorumControllerTest {
     static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
             fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided bootstrap");
 
-    static class MockControllerMetrics extends QuorumControllerMetrics {
-        final AtomicBoolean closed = new AtomicBoolean(false);
-
-        MockControllerMetrics() {
-            super(Optional.empty(), Time.SYSTEM);
-        }
-
-        @Override
-        public void close() {
-            super.close();
-            closed.set(true);
-        }
-    }
-
-    /**
-     * Test creating a new QuorumController and closing it.
-     */
-    @Test
-    public void testCreateAndClose() throws Throwable {
-        MockControllerMetrics metrics = new MockControllerMetrics();
-        try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
-                build();
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
-                setControllerBuilderInitializer(controllerBuilder -> {
-                    controllerBuilder.setMetrics(metrics);
-                }).
-                build()
-        ) {
-        }
-        assertTrue(metrics.closed.get(), "metrics were not closed");
-    }
-
     /**
      * Test setting some configuration values and reading them back.
      */
@@ -610,22 +579,6 @@ public class QuorumControllerTest {
         }
     }
 
-    private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
-        return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest());
-    }
-
-    private BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
-        MetadataVersion minVersion,
-        MetadataVersion maxVersion
-    ) {
-        BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
-        features.add(new BrokerRegistrationRequestData.Feature()
-            .setName(MetadataVersion.FEATURE_NAME)
-            .setMinSupportedVersion(minVersion.featureLevel())
-            .setMaxSupportedVersion(maxVersion.featureLevel()));
-        return features;
-    }
-
     private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(
         MetadataVersion minVersion,
         MetadataVersion maxVersion
@@ -782,7 +735,7 @@ public class QuorumControllerTest {
                 build()
         ) {
             QuorumController controller = controlEnv.activeController();
-            CountDownLatch countDownLatch = controller.pause();
+            CountDownLatch countDownLatch = pause(controller);
             long now = controller.time().nanoseconds();
             ControllerRequestContext context0 = new ControllerRequestContext(
                 new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
@@ -846,7 +799,7 @@ public class QuorumControllerTest {
                 build()
         ) {
             QuorumController controller = controlEnv.activeController();
-            CountDownLatch countDownLatch = controller.pause();
+            CountDownLatch countDownLatch = pause(controller);
             CompletableFuture<CreateTopicsResponseData> createFuture =
                 controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
                     setTimeoutMs(120000), Collections.emptySet());
@@ -891,7 +844,7 @@ public class QuorumControllerTest {
         ) {
             QuorumController controller = controlEnv.activeController();
 
-            Map<Integer, Long> brokerEpochs = registerBrokers(controller, numBrokers);
+            Map<Integer, Long> brokerEpochs = registerBrokersAndUnfence(controller, numBrokers);
 
             // Create a lot of partitions
             List<CreatableReplicaAssignment> partitions = IntStream
@@ -980,62 +933,6 @@ public class QuorumControllerTest {
         }
     }
 
-    private Map<Integer, Long> registerBrokers(QuorumController controller, int numBrokers) throws Exception {
-        Map<Integer, Long> brokerEpochs = new HashMap<>();
-        for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
-            BrokerRegistrationReply reply = controller.registerBroker(ANONYMOUS_CONTEXT,
-                new BrokerRegistrationRequestData()
-                    .setBrokerId(brokerId)
-                    .setRack(null)
-                    .setClusterId(controller.clusterId())
-                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0))
-                    .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
-                    .setListeners(
-                        new ListenerCollection(
-                            Arrays.asList(
-                                new Listener()
-                                .setName("PLAINTEXT")
-                                .setHost("localhost")
-                                .setPort(9092 + brokerId)
-                                ).iterator()
-                            )
-                        )
-                    ).get();
-            brokerEpochs.put(brokerId, reply.epoch());
-
-            // Send heartbeat to unfence
-            controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
-                new BrokerHeartbeatRequestData()
-                    .setWantFence(false)
-                    .setBrokerEpoch(brokerEpochs.get(brokerId))
-                    .setBrokerId(brokerId)
-                    .setCurrentMetadataOffset(100000L)
-            ).get();
-        }
-
-        return brokerEpochs;
-    }
-
-    private void sendBrokerHeartbeat(
-        QuorumController controller,
-        List<Integer> brokers,
-        Map<Integer, Long> brokerEpochs
-    ) throws Exception {
-        if (brokers.isEmpty()) {
-            return;
-        }
-        for (Integer brokerId : brokers) {
-            BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
-                new BrokerHeartbeatRequestData()
-                    .setWantFence(false)
-                    .setBrokerEpoch(brokerEpochs.get(brokerId))
-                    .setBrokerId(brokerId)
-                    .setCurrentMetadataOffset(100000)
-            ).get();
-            assertEquals(new BrokerHeartbeatReply(true, false, false, false), reply);
-        }
-    }
-
     @Test
     public void testConfigResourceExistenceChecker() throws Throwable {
         try (
@@ -1048,7 +945,7 @@ public class QuorumControllerTest {
                 build()
         ) {
             QuorumController active = controlEnv.activeController();
-            registerBrokers(active, 5);
+            registerBrokersAndUnfence(active, 5);
             active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
                 setTopics(new CreatableTopicCollection(Collections.singleton(
                     new CreatableTopic().setName("foo").
@@ -1508,7 +1405,8 @@ public class QuorumControllerTest {
 
         featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), true);
         assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion());
-        assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());    }
+        assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState());
+    }
 
     @Test
     public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Exception {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 619100f1ed8..5bba7d0658b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -40,14 +40,17 @@ public class QuorumControllerMetricsTest {
             try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
                 ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
                     new HashSet<>(Arrays.asList(
-                        "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
                         "kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
+                        "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
                         "kafka.controller:type=KafkaController,name=ActiveControllerCount",
-                        "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+                        "kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
+                        "kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
                         "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
                         "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
                         "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
-                        "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset"
+                        "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+                        "kafka.controller:type=KafkaController,name=NewActiveControllersCount",
+                        "kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount"
                     )));
             }
             ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 1eefa15a8c3..372700b1fb6 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -55,6 +56,7 @@ import java.util.stream.Collectors;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -251,6 +253,13 @@ public class MetadataLoaderTest {
                     )
                 );
                 loader.handleLoadSnapshot(snapshotReader);
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    assertEquals(1L, loader.metrics().handleLoadSnapshotCount());
+                });
+            } else {
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    assertEquals(0L, loader.metrics().handleLoadSnapshotCount());
+                });
             }
             loader.waitForAllEventsToBeHandled();
             if (sameObject) {
@@ -328,6 +337,8 @@ public class MetadataLoaderTest {
             assertEquals(300L, loader.lastAppliedOffset());
             assertEquals(new SnapshotManifest(new MetadataProvenance(300, 100, 4000), 3000000L),
                 publishers.get(0).latestSnapshotManifest);
+            assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION,
+                loader.metrics().currentMetadataVersion());
         }
         assertTrue(publishers.get(0).closed);
         assertEquals(MetadataVersion.IBP_3_0_IV1,
@@ -587,14 +598,27 @@ public class MetadataLoaderTest {
 
             loadTestSnapshot(loader, 200);
             assertEquals(200L, loader.lastAppliedOffset());
+            assertEquals(IBP_3_3_IV1.featureLevel(),
+                loader.metrics().currentMetadataVersion().featureLevel());
             assertFalse(publishers.get(0).latestDelta.image().isEmpty());
 
             loadTestSnapshot2(loader, 400);
             assertEquals(400L, loader.lastAppliedOffset());
+            assertEquals(IBP_3_3_IV2.featureLevel(),
+                loader.metrics().currentMetadataVersion().featureLevel());
 
             // Make sure the topic in the initial snapshot was overwritten by loading the new snapshot.
             assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
             assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar"));
+
+            loader.handleCommit(new MockBatchReader(500, asList(
+                MockBatchReader.newBatch(500, 100, asList(
+                    new ApiMessageAndVersion(new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(IBP_3_5_IV0.featureLevel()), (short) 0))))));
+            loader.waitForAllEventsToBeHandled();
+            assertEquals(IBP_3_5_IV0.featureLevel(),
+                loader.metrics().currentMetadataVersion().featureLevel());
         }
         faultHandler.maybeRethrowFirstException();
     }
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
new file mode 100644
index 00000000000..c8d33754106
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
+import org.apache.kafka.image.MetadataProvenance;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class MetadataLoaderMetricsTest {
+    private static class FakeMetadataLoaderMetrics implements AutoCloseable {
+        final AtomicLong batchProcessingTimeNs = new AtomicLong(0L);
+        final AtomicInteger batchSize = new AtomicInteger(0);
+        final AtomicReference<MetadataProvenance> provenance =
+            new AtomicReference<>(MetadataProvenance.EMPTY);
+        final MetadataLoaderMetrics metrics;
+
+        FakeMetadataLoaderMetrics() {
+            this(Optional.empty());
+        }
+
+        FakeMetadataLoaderMetrics(MetricsRegistry registry) {
+            this(Optional.of(registry));
+        }
+
+        FakeMetadataLoaderMetrics(Optional<MetricsRegistry> registry) {
+            metrics = new MetadataLoaderMetrics(
+                registry,
+                n -> batchProcessingTimeNs.set(n),
+                n -> batchSize.set(n),
+                provenance);
+        }
+
+        @Override
+        public void close() {
+            metrics.close();
+        }
+    }
+
+    @Test
+    public void testMetricNames() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try {
+            try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+                ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+                    new HashSet<>(Arrays.asList(
+                        "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+                        "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+                    )));
+            }
+            ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+                    Collections.emptySet());
+        } finally {
+            registry.shutdown();
+        }
+    }
+
+    @Test
+    public void testUpdateBatchProcessingTimeNs() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+            fakeMetrics.metrics.updateBatchProcessingTimeNs(123L);
+            assertEquals(123L, fakeMetrics.batchProcessingTimeNs.get());
+        }
+    }
+
+    @Test
+    public void testUpdateBatchSize() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+            fakeMetrics.metrics.updateBatchSize(50);
+            assertEquals(50, fakeMetrics.batchSize.get());
+        }
+    }
+
+    @Test
+    public void testUpdateLastAppliedImageProvenance() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+            fakeMetrics.metrics.updateLastAppliedImageProvenance(new MetadataProvenance(1L, 2, 3L));
+            assertEquals(new MetadataProvenance(1L, 2, 3L), fakeMetrics.provenance.get());
+        }
+    }
+
+    @Test
+    public void testManagedMetrics() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try {
+            try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) {
+                fakeMetrics.metrics.setCurrentMetadataVersion(IBP_3_3_IV2);
+                fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
+                fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
+
+                @SuppressWarnings("unchecked")
+                Gauge<Integer> currentMetadataVersion = (Gauge<Integer>) registry
+                    .allMetrics()
+                    .get(metricName("MetadataLoader", "CurrentMetadataVersion"));
+                assertEquals(IBP_3_3_IV2.featureLevel(),
+                    currentMetadataVersion.value().shortValue());
+
+                @SuppressWarnings("unchecked")
+                Gauge<Long> loadSnapshotCount = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("MetadataLoader", "HandleLoadSnapshotCount"));
+                assertEquals(2L, loadSnapshotCount.value().longValue());
+            }
+            ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server",
+                Collections.emptySet());
+        } finally {
+            registry.shutdown();
+        }
+    }
+
+    private static MetricName metricName(String type, String name) {
+        String mBeanName = String.format("kafka.server:type=%s,name=%s", type, name);
+        return new MetricName("kafka.server", type, name, null, mBeanName);
+    }
+}