You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2023/10/17 23:06:30 UTC

[kafka] branch trunk updated: KAFKA-14519; [1/N] Implement coordinator runtime metrics (#14417)

This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new abee8f711c5 KAFKA-14519; [1/N] Implement coordinator runtime metrics (#14417)
abee8f711c5 is described below

commit abee8f711c5c9ab6cae80406ce8ccd65f62841ce
Author: Jeff Kim <ki...@gmail.com>
AuthorDate: Wed Oct 18 08:06:23 2023 +0900

    KAFKA-14519; [1/N] Implement coordinator runtime metrics (#14417)
    
    Implements the following metrics:
    
    kafka.server:type=group-coordinator-metrics,name=num-partitions,state=loading
    kafka.server:type=group-coordinator-metrics,name=num-partitions,state=active
    kafka.server:type=group-coordinator-metrics,name=num-partitions,state=failed
    kafka.server:type=group-coordinator-metrics,name=event-queue-size
    kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
    kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg
    kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min
    kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg
    The PR makes these metrics generic so that in the future the transaction coordinator runtime can implement the same metrics in a similar fashion.
    
    Also, CoordinatorLoaderImpl#load will now return LoadSummary which encapsulates the start time, end time, number of records/bytes.
    
    Co-authored-by: David Jacot <dj...@confluent.io>
    
    Reviewers:  Ritika Reddy <rr...@confluent.io>, Calvin Liu <ca...@confluent.io>, David Jacot <dj...@confluent.io>, Justine Olshan <jo...@confluent.io>
---
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   4 +
 checkstyle/suppressions.xml                        |   2 +
 .../coordinator/group/CoordinatorLoaderImpl.scala  |  21 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   3 +
 .../group/CoordinatorLoaderImplTest.scala          |  71 +++++-
 .../coordinator/group/GroupCoordinatorService.java |  14 +-
 .../group/metrics/CoordinatorRuntimeMetrics.java   |  68 ++++++
 .../metrics/GroupCoordinatorRuntimeMetrics.java    | 219 +++++++++++++++++++
 .../group/runtime/CoordinatorEvent.java            |   5 +
 .../group/runtime/CoordinatorLoader.java           |  45 +++-
 .../group/runtime/CoordinatorRuntime.java          |  69 +++++-
 .../group/runtime/MultiThreadedEventProcessor.java |  63 +++++-
 .../GroupCoordinatorRuntimeMetricsTest.java        | 149 +++++++++++++
 .../group/runtime/CoordinatorRuntimeTest.java      | 198 +++++++++++++++--
 .../runtime/MultiThreadedEventProcessorTest.java   | 243 ++++++++++++++++++++-
 16 files changed, 1124 insertions(+), 51 deletions(-)

diff --git a/build.gradle b/build.gradle
index c88ac7b189d..8fc01156a86 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1260,6 +1260,7 @@ project(':group-coordinator') {
     implementation project(':clients')
     implementation project(':metadata')
     implementation libs.slf4jApi
+    implementation libs.metrics
 
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':server-common').sourceSets.test.output
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 22195e78ee6..ea41f587c92 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -239,6 +239,10 @@
       <allow pkg="org.apache.kafka.server.util"/>
       <allow pkg="org.apache.kafka.test" />
       <allow pkg="org.apache.kafka.timeline" />
+      <subpackage name="metrics">
+        <allow pkg="org.apache.kafka.common.metrics" />
+        <allow pkg="org.apache.kafka.server.metrics" />
+      </subpackage>
     </subpackage>
   </subpackage>
 
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a554fcc56b6..a66e63428dc 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -325,6 +325,8 @@
               files="(ConsumerGroupMember|GroupMetadataManager).java"/>
     <suppress checks="(NPathComplexity|MethodLength)"
               files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
+    <suppress checks="NPathComplexity"
+              files="CoordinatorRuntime.java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/>
     <suppress checks="ParameterNumber"
diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
index 6b67d1a7a6b..e2ac2f66ead 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -21,7 +21,8 @@ import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
-import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, UnknownRecordTypeException}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, LoadSummary, UnknownRecordTypeException}
 import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback}
 import org.apache.kafka.server.util.KafkaScheduler
 import org.apache.kafka.storage.internals.log.FetchIsolation
@@ -41,6 +42,7 @@ import scala.jdk.CollectionConverters._
  * @tparam T The record type.
  */
 class CoordinatorLoaderImpl[T](
+  time: Time,
   replicaManager: ReplicaManager,
   deserializer: Deserializer[T],
   loadBufferSize: Int
@@ -59,10 +61,11 @@ class CoordinatorLoaderImpl[T](
   override def load(
     tp: TopicPartition,
     coordinator: CoordinatorPlayback[T]
-): CompletableFuture[Void] = {
-    val future = new CompletableFuture[Void]()
+): CompletableFuture[LoadSummary] = {
+    val future = new CompletableFuture[LoadSummary]()
+    val startTimeMs = time.milliseconds()
     val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
-      () => doLoad(tp, coordinator, future))
+      () => doLoad(tp, coordinator, future, startTimeMs))
     if (result.isCancelled) {
       future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
     }
@@ -72,7 +75,8 @@ class CoordinatorLoaderImpl[T](
   private def doLoad(
     tp: TopicPartition,
     coordinator: CoordinatorPlayback[T],
-    future: CompletableFuture[Void]
+    future: CompletableFuture[LoadSummary],
+    startTimeMs: Long
   ): Unit = {
     try {
       replicaManager.getLog(tp) match {
@@ -92,6 +96,8 @@ class CoordinatorLoaderImpl[T](
           // the log end offset but the log is empty. This could happen with compacted topics.
           var readAtLeastOneRecord = true
 
+          var numRecords = 0
+          var numBytes = 0
           while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
             val fetchDataInfo = log.read(
               startOffset = currentOffset,
@@ -131,6 +137,7 @@ class CoordinatorLoaderImpl[T](
                 throw new IllegalStateException("Control batches are not supported yet.")
               } else {
                 batch.asScala.foreach { record =>
+                  numRecords = numRecords + 1
                   try {
                     coordinator.replay(deserializer.deserialize(record.key, record.value))
                   } catch {
@@ -143,10 +150,12 @@ class CoordinatorLoaderImpl[T](
 
               currentOffset = batch.nextOffset
             }
+            numBytes = numBytes + memoryRecords.sizeInBytes()
           }
+          val endTimeMs = time.milliseconds()
 
           if (isRunning.get) {
-            future.complete(null)
+            future.complete(new LoadSummary(startTimeMs, endTimeMs, numRecords, numBytes))
           } else {
             future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
           }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index daec8f73089..fe8e5acb9fa 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition}
 import org.apache.kafka.coordinator.group
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics
 import org.apache.kafka.coordinator.group.util.SystemTimerReaper
 import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
 import org.apache.kafka.image.publisher.MetadataPublisher
@@ -531,6 +532,7 @@ class BrokerServer(
         new SystemTimer("group-coordinator")
       )
       val loader = new CoordinatorLoaderImpl[group.Record](
+        time,
         replicaManager,
         serde,
         config.offsetsLoadBufferSize
@@ -546,6 +548,7 @@ class BrokerServer(
         .withTimer(timer)
         .withLoader(loader)
         .withWriter(writer)
+        .withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics))
         .build()
     } else {
       GroupCoordinatorAdapter(
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index c71c28d47ea..ef19d732c34 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -22,14 +22,16 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.utils.{MockTime, Time}
 import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException
 import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback}
 import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogOffsetMetadata}
 import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
 import org.junit.jupiter.api.{Test, Timeout}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 import org.mockito.Mockito.{mock, verify, when}
+import org.mockito.invocation.InvocationOnMock
 
 import java.nio.ByteBuffer
 import java.nio.charset.Charset
@@ -54,6 +56,7 @@ class CoordinatorLoaderImplTest {
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
     TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
       loadBufferSize = 1000
@@ -73,6 +76,7 @@ class CoordinatorLoaderImplTest {
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
     TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
       loadBufferSize = 1000
@@ -93,6 +97,7 @@ class CoordinatorLoaderImplTest {
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
     TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
       loadBufferSize = 1000
@@ -126,7 +131,7 @@ class CoordinatorLoaderImplTest {
         minOneMessage = true
       )).thenReturn(readResult2)
 
-      assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+      assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
 
       verify(coordinator).replay(("k1", "v1"))
       verify(coordinator).replay(("k2", "v2"))
@@ -145,6 +150,7 @@ class CoordinatorLoaderImplTest {
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
     TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
       loadBufferSize = 1000
@@ -187,6 +193,7 @@ class CoordinatorLoaderImplTest {
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
     TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
       loadBufferSize = 1000
@@ -226,6 +233,7 @@ class CoordinatorLoaderImplTest {
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
     TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
       loadBufferSize = 1000
@@ -266,6 +274,7 @@ class CoordinatorLoaderImplTest {
     val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
 
     TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time = Time.SYSTEM,
       replicaManager = replicaManager,
       deserializer = serde,
       loadBufferSize = 1000
@@ -283,7 +292,63 @@ class CoordinatorLoaderImplTest {
         minOneMessage = true
       )).thenReturn(readResult)
 
-      assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+      assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+    }
+  }
+
+  @Test
+  def testLoadSummary(): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val replicaManager = mock(classOf[ReplicaManager])
+    val serde = new StringKeyValueDeserializer
+    val log = mock(classOf[UnifiedLog])
+    val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+    val time = new MockTime()
+
+    TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+      time,
+      replicaManager = replicaManager,
+      deserializer = serde,
+      loadBufferSize = 1000
+    )) { loader =>
+      val startTimeMs = time.milliseconds()
+      when(replicaManager.getLog(tp)).thenReturn(Some(log))
+      when(log.logStartOffset).thenReturn(0L)
+      when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
+
+      val readResult1 = logReadResult(startOffset = 0, records = Seq(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 0L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenAnswer((_: InvocationOnMock) => {
+        time.sleep(1000)
+        readResult1
+      })
+
+      val readResult2 = logReadResult(startOffset = 2, records = Seq(
+        new SimpleRecord("k3".getBytes, "v3".getBytes),
+        new SimpleRecord("k4".getBytes, "v4".getBytes),
+        new SimpleRecord("k5".getBytes, "v5".getBytes)
+      ))
+
+      when(log.read(
+        startOffset = 2L,
+        maxLength = 1000,
+        isolation = FetchIsolation.LOG_END,
+        minOneMessage = true
+      )).thenReturn(readResult2)
+
+      val summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
+      assertEquals(startTimeMs, summary.startTimeMs())
+      assertEquals(startTimeMs + 1000, summary.endTimeMs())
+      assertEquals(5, summary.numRecords())
+      assertEquals(readResult1.records.sizeInBytes() + readResult2.records.sizeInBytes(), summary.numBytes())
     }
   }
 
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index e30bb930269..176dc770cfe 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
 import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
 import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
 import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
@@ -102,6 +103,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
         private CoordinatorLoader<Record> loader;
         private Time time;
         private Timer timer;
+        private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
 
         public Builder(
             int nodeId,
@@ -131,6 +133,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
             return this;
         }
 
+        public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) {
+            this.coordinatorRuntimeMetrics = coordinatorRuntimeMetrics;
+            return this;
+        }
+
         public GroupCoordinatorService build() {
             if (config == null)
                 throw new IllegalArgumentException("Config must be set.");
@@ -142,6 +149,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
                 throw new IllegalArgumentException("Time must be set.");
             if (timer == null)
                 throw new IllegalArgumentException("Timer must be set.");
+            if (coordinatorRuntimeMetrics == null)
+                throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
 
             String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
             LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
@@ -152,7 +161,9 @@ public class GroupCoordinatorService implements GroupCoordinator {
             CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
                 logContext,
                 "group-coordinator-event-processor-",
-                config.numThreads
+                config.numThreads,
+                time,
+                coordinatorRuntimeMetrics
             );
 
             CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
@@ -166,6 +177,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
                     .withLoader(loader)
                     .withCoordinatorShardBuilderSupplier(supplier)
                     .withTime(time)
+                    .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
                     .build();
 
             return new GroupCoordinatorService(
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java
new file mode 100644
index 00000000000..8978a040007
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+
+    /**
+     * Called when the partition state changes.
+     * @param oldState The old state.
+     * @param newState The new state to transition to.
+     */
+    void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState);
+
+    /**
+     * Record the partition load metric.
+     * @param startTimeMs The partition load start time.
+     * @param endTimeMs   The partition load end time.
+     */
+    void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+    /**
+     * Update the event queue time.
+     *
+     * @param durationMs The queue time.
+     */
+    void recordEventQueueTime(long durationMs);
+
+    /**
+     * Update the event queue processing time.
+     *
+     * @param durationMs The event processing time.
+     */
+    void recordEventQueueProcessingTime(long durationMs);
+
+    /**
+     * Record the thread idle ratio.
+     * @param ratio The idle ratio.
+     */
+    void recordThreadIdleRatio(double ratio);
+
+    /**
+     * Register the event queue size gauge.
+     *
+     * @param sizeSupplier The size supplier.
+     */
+    void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier);
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java
new file mode 100644
index 00000000000..e4c6552c4bc
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class GroupCoordinatorRuntimeMetrics implements CoordinatorRuntimeMetrics {
+    /**
+     * The metrics group.
+     */
+    public static final String METRICS_GROUP = "group-coordinator-metrics";
+
+    /**
+     * The partition count metric name.
+     */
+    public static final String NUM_PARTITIONS_METRIC_NAME = "num-partitions";
+
+    /**
+     * Metric to count the number of partitions in Loading state.
+     */
+    private final MetricName numPartitionsLoading;
+    private final AtomicLong numPartitionsLoadingCounter = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Active state.
+     */
+    private final MetricName numPartitionsActive;
+    private final AtomicLong numPartitionsActiveCounter = new AtomicLong(0);
+
+    /**
+     * Metric to count the number of partitions in Failed state.
+     */
+    private final MetricName numPartitionsFailed;
+    private final AtomicLong numPartitionsFailedCounter = new AtomicLong(0);
+
+    /**
+     * Metric to count the size of the processor queue.
+     */
+    private final MetricName eventQueueSize;
+
+    /**
+     * The Kafka metrics registry.
+     */
+    private final Metrics metrics;
+
+    /**
+     * The partition load sensor.
+     */
+    private Sensor partitionLoadSensor;
+
+    /**
+     * The thread idle sensor.
+     */
+    private Sensor threadIdleRatioSensor;
+
+    public GroupCoordinatorRuntimeMetrics(Metrics metrics) {
+        this.metrics = Objects.requireNonNull(metrics);
+
+        this.numPartitionsLoading = kafkaMetricName(
+            NUM_PARTITIONS_METRIC_NAME,
+            "The number of partitions in Loading state.",
+            "state", "loading"
+        );
+
+        this.numPartitionsActive = kafkaMetricName(
+            NUM_PARTITIONS_METRIC_NAME,
+            "The number of partitions in Active state.",
+            "state", "active"
+        );
+
+        this.numPartitionsFailed = kafkaMetricName(
+            NUM_PARTITIONS_METRIC_NAME,
+            "The number of partitions in Failed state.",
+            "state", "failed"
+        );
+
+        this.eventQueueSize = kafkaMetricName("event-queue-size", "The event accumulator queue size.");
+
+        metrics.addMetric(numPartitionsLoading, (Gauge<Long>) (config, now) -> numPartitionsLoadingCounter.get());
+        metrics.addMetric(numPartitionsActive, (Gauge<Long>) (config, now) -> numPartitionsActiveCounter.get());
+        metrics.addMetric(numPartitionsFailed, (Gauge<Long>) (config, now) -> numPartitionsFailedCounter.get());
+
+        this.partitionLoadSensor = metrics.sensor("GroupPartitionLoadTime");
+        this.partitionLoadSensor.add(
+            metrics.metricName(
+                "partition-load-time-max",
+                METRICS_GROUP,
+                "The max time it took to load the partitions in the last 30 sec."
+            ), new Max());
+        this.partitionLoadSensor.add(
+            metrics.metricName(
+                "partition-load-time-avg",
+                METRICS_GROUP,
+                "The average time it took to load the partitions in the last 30 sec."
+            ), new Avg());
+
+        this.threadIdleRatioSensor = metrics.sensor("ThreadIdleRatio");
+        this.threadIdleRatioSensor.add(
+            metrics.metricName(
+                "thread-idle-ratio-min",
+                METRICS_GROUP,
+                "The minimum thread idle ratio over the last 30 seconds."
+            ), new Min());
+        this.threadIdleRatioSensor.add(
+            metrics.metricName(
+                "thread-idle-ratio-avg",
+                METRICS_GROUP,
+                "The average thread idle ratio over the last 30 seconds."
+            ), new Avg());
+    }
+
+    /**
+     * Retrieve the kafka metric name.
+     *
+     * @param name The name of the metric.
+     *
+     * @return The kafka metric name.
+     */
+    private MetricName kafkaMetricName(String name, String description, String... keyValue) {
+        return metrics.metricName(name, METRICS_GROUP, description, keyValue);
+    }
+
+    @Override
+    public void close() {
+        Arrays.asList(
+            numPartitionsLoading,
+            numPartitionsActive,
+            numPartitionsFailed,
+            eventQueueSize
+        ).forEach(metrics::removeMetric);
+
+        metrics.removeSensor(partitionLoadSensor.name());
+        metrics.removeSensor(threadIdleRatioSensor.name());
+    }
+
+    /**
+     * Called when the partition state changes. Decrement the old state and increment the new state.
+     *
+     * @param oldState The old state.
+     * @param newState The new state to transition to.
+     */
+    @Override
+    public void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState) {
+        switch (oldState) {
+            case INITIAL:
+            case CLOSED:
+                break;
+            case LOADING:
+                numPartitionsLoadingCounter.decrementAndGet();
+                break;
+            case ACTIVE:
+                numPartitionsActiveCounter.decrementAndGet();
+                break;
+            case FAILED:
+                numPartitionsFailedCounter.decrementAndGet();
+        }
+
+        switch (newState) {
+            case INITIAL:
+            case CLOSED:
+                break;
+            case LOADING:
+                numPartitionsLoadingCounter.incrementAndGet();
+                break;
+            case ACTIVE:
+                numPartitionsActiveCounter.incrementAndGet();
+                break;
+            case FAILED:
+                numPartitionsFailedCounter.incrementAndGet();
+        }
+    }
+
+    @Override
+    public void recordPartitionLoadSensor(long startTimeMs, long endTimeMs) {
+        this.partitionLoadSensor.record(endTimeMs - startTimeMs, endTimeMs, false);
+    }
+
+    @Override
+    public void recordEventQueueTime(long durationMs) { }
+
+    @Override
+    public void recordEventQueueProcessingTime(long durationMs) { }
+
+    @Override
+    public void recordThreadIdleRatio(double ratio) {
+        threadIdleRatioSensor.record(ratio);
+    }
+
+    @Override
+    public void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier) {
+        metrics.addMetric(eventQueueSize, (Gauge<Long>) (config, now) -> (long) sizeSupplier.get());
+    }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java
index fb9bdbed651..19fc0d88cf8 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java
@@ -35,4 +35,9 @@ public interface CoordinatorEvent extends EventAccumulator.Event<TopicPartition>
      * @param exception An exception if the processing of the event failed or null otherwise.
      */
     void complete(Throwable exception);
+
+    /**
+     * @return The created time in milliseconds.
+     */
+    long createdTimeMs();
 }
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
index 0fe23b5dc69..dd6a67ec15c 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
@@ -46,6 +46,49 @@ public interface CoordinatorLoader<U> extends AutoCloseable {
         }
     }
 
+    /**
+     * Object that is returned as part of the future from load(). Holds the partition load time and the
+     * end time.
+     */
+    class LoadSummary {
+        private final long startTimeMs;
+        private final long endTimeMs;
+        private final long numRecords;
+        private final long numBytes;
+
+        public LoadSummary(long startTimeMs, long endTimeMs, long numRecords, long numBytes) {
+            this.startTimeMs = startTimeMs;
+            this.endTimeMs = endTimeMs;
+            this.numRecords = numRecords;
+            this.numBytes = numBytes;
+        }
+
+        public long startTimeMs() {
+            return startTimeMs;
+        }
+
+        public long endTimeMs() {
+            return endTimeMs;
+        }
+
+        public long numRecords() {
+            return numRecords;
+        }
+
+        public long numBytes() {
+            return numBytes;
+        }
+
+        @Override
+        public String toString() {
+            return "LoadSummary(" +
+                "startTimeMs=" + startTimeMs +
+                ", endTimeMs=" + endTimeMs +
+                ", numRecords=" + numRecords +
+                ", numBytes=" + numBytes + ")";
+        }
+    }
+
     /**
      * Deserializer to translates bytes to T.
      *
@@ -69,7 +112,7 @@ public interface CoordinatorLoader<U> extends AutoCloseable {
      * @param tp            The TopicPartition to read from.
      * @param coordinator   The object to apply records to.
      */
-    CompletableFuture<Void> load(
+    CompletableFuture<LoadSummary> load(
         TopicPartition tp,
         CoordinatorPlayback<U> coordinator
     );
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 8e0f53f5d0b..deeed11bc85 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
 import org.apache.kafka.deferred.DeferredEvent;
 import org.apache.kafka.deferred.DeferredEventQueue;
 import org.apache.kafka.image.MetadataDelta;
@@ -89,6 +90,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
         private CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
         private Time time = Time.SYSTEM;
         private Timer timer;
+        private CoordinatorRuntimeMetrics runtimeMetrics;
 
         public Builder<S, U> withLogPrefix(String logPrefix) {
             this.logPrefix = logPrefix;
@@ -130,6 +132,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
             return this;
         }
 
+        public Builder<S, U> withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics runtimeMetrics) {
+            this.runtimeMetrics = runtimeMetrics;
+            return this;
+        }
+
         public CoordinatorRuntime<S, U> build() {
             if (logPrefix == null)
                 logPrefix = "";
@@ -147,6 +154,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
                 throw new IllegalArgumentException("Time must be set.");
             if (timer == null)
                 throw new IllegalArgumentException("Timer must be set.");
+            if (runtimeMetrics == null)
+                throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
 
             return new CoordinatorRuntime<>(
                 logPrefix,
@@ -156,7 +165,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
                 loader,
                 coordinatorShardBuilderSupplier,
                 time,
-                timer
+                timer,
+                runtimeMetrics
             );
         }
     }
@@ -164,7 +174,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
     /**
      * The various state that a coordinator for a partition can be in.
      */
-    enum CoordinatorState {
+    public enum CoordinatorState {
         /**
          * Initial state when a coordinator is created.
          */
@@ -501,6 +511,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
                 throw new IllegalStateException("Cannot transition from " + state + " to " + newState);
             }
 
+            CoordinatorState oldState = state;
             log.debug("Transition from {} to {}.", state, newState);
             switch (newState) {
                 case LOADING:
@@ -537,6 +548,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
                 default:
                     throw new IllegalArgumentException("Transitioning to " + newState + " is not supported.");
             }
+
+            runtimeMetrics.recordPartitionStateChange(oldState, state);
         }
 
         /**
@@ -608,6 +621,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
          */
         CoordinatorResult<T, U> result;
 
+        /**
+         * The time this event was created.
+         */
+        private final long createdTimeMs;
+
         /**
          * Constructor.
          *
@@ -624,6 +642,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
             this.name = name;
             this.op = op;
             this.future = new CompletableFuture<>();
+            this.createdTimeMs = time.milliseconds();
         }
 
         /**
@@ -709,6 +728,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
             }
         }
 
+        @Override
+        public long createdTimeMs() {
+            return this.createdTimeMs;
+        }
+
         @Override
         public String toString() {
             return "CoordinatorWriteEvent(name=" + name + ")";
@@ -768,6 +792,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
          */
         T response;
 
+        /**
+         * The time this event was created.
+         */
+        private final long createdTimeMs;
+
         /**
          * Constructor.
          *
@@ -784,6 +813,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
             this.name = name;
             this.op = op;
             this.future = new CompletableFuture<>();
+            this.createdTimeMs = time.milliseconds();
         }
 
         /**
@@ -832,6 +862,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
             }
         }
 
+        @Override
+        public long createdTimeMs() {
+            return this.createdTimeMs;
+        }
+
         @Override
         public String toString() {
             return "CoordinatorReadEvent(name=" + name + ")";
@@ -857,6 +892,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
          */
         final Runnable op;
 
+        /**
+         * The time this event was created.
+         */
+        private final long createdTimeMs;
+
         /**
          * Constructor.
          *
@@ -872,6 +912,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
             this.tp = tp;
             this.name = name;
             this.op = op;
+            this.createdTimeMs = time.milliseconds();
         }
 
         /**
@@ -907,6 +948,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
             }
         }
 
+        @Override
+        public long createdTimeMs() {
+            return this.createdTimeMs;
+        }
+
         @Override
         public String toString() {
             return "InternalEvent(name=" + name + ")";
@@ -995,6 +1041,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
      */
     private final CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
 
+    /**
+     * The coordinator runtime metrics.
+     */
+    private final CoordinatorRuntimeMetrics runtimeMetrics;
+
     /**
      * Atomic boolean indicating whether the runtime is running.
      */
@@ -1025,7 +1076,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
         CoordinatorLoader<U> loader,
         CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier,
         Time time,
-        Timer timer
+        Timer timer,
+        CoordinatorRuntimeMetrics runtimeMetrics
     ) {
         this.logPrefix = logPrefix;
         this.logContext = logContext;
@@ -1038,6 +1090,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
         this.highWatermarklistener = new HighWatermarkListener();
         this.loader = loader;
         this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
+        this.runtimeMetrics = runtimeMetrics;
     }
 
     /**
@@ -1242,7 +1295,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
                         case FAILED:
                         case INITIAL:
                             context.transitionTo(CoordinatorState.LOADING);
-                            loader.load(tp, context.coordinator).whenComplete((state, exception) -> {
+                            loader.load(tp, context.coordinator).whenComplete((summary, exception) -> {
                                 scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
                                     withContextOrThrow(tp, ctx -> {
                                         if (ctx.state != CoordinatorState.LOADING) {
@@ -1254,8 +1307,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
                                         try {
                                             if (exception != null) throw exception;
                                             ctx.transitionTo(CoordinatorState.ACTIVE);
-                                            log.info("Finished loading of metadata from {} with epoch {}.",
-                                                tp, partitionEpoch
+                                            if (summary != null) {
+                                                runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), summary.endTimeMs());
+                                            }
+                                            log.info("Finished loading of metadata from {} with epoch {} and LoadSummary={}.",
+                                                tp, partitionEpoch, summary
                                             );
                                         } catch (Throwable ex) {
                                             log.error("Failed to load metadata from {} with epoch {} due to {}.",
@@ -1373,6 +1429,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
             context.transitionTo(CoordinatorState.CLOSED);
         });
         coordinators.clear();
+        Utils.closeQuietly(runtimeMetrics, "runtime metrics");
         log.info("Coordinator runtime closed.");
     }
 }
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
index 1f35d65c3f4..e4adc18e957 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
@@ -18,9 +18,12 @@ package org.apache.kafka.coordinator.group.runtime;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
 import org.slf4j.Logger;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -52,21 +55,50 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
      */
     private volatile boolean shuttingDown;
 
+    /**
+     * The coordinator runtime metrics.
+     */
+    private final CoordinatorRuntimeMetrics metrics;
+
+    /**
+     * The time.
+     */
+    private final Time time;
+
+    public MultiThreadedEventProcessor(
+        LogContext logContext,
+        String threadPrefix,
+        int numThreads,
+        Time time,
+        CoordinatorRuntimeMetrics metrics
+    ) {
+        this(logContext, threadPrefix, numThreads, time, metrics, new EventAccumulator<>());
+    }
+
     /**
      * Constructor.
      *
-     * @param logContext    The log context.
-     * @param threadPrefix  The thread prefix.
-     * @param numThreads    The number of threads.
+     * @param logContext        The log context.
+     * @param threadPrefix      The thread prefix.
+     * @param numThreads        The number of threads.
+     * @param metrics           The coordinator runtime metrics.
+     * @param time              The time.
+     * @param eventAccumulator  The event accumulator.
      */
     public MultiThreadedEventProcessor(
         LogContext logContext,
         String threadPrefix,
-        int numThreads
+        int numThreads,
+        Time time,
+        CoordinatorRuntimeMetrics metrics,
+        EventAccumulator<TopicPartition, CoordinatorEvent> eventAccumulator
     ) {
         this.log = logContext.logger(MultiThreadedEventProcessor.class);
         this.shuttingDown = false;
-        this.accumulator = new EventAccumulator<>();
+        this.accumulator = eventAccumulator;
+        this.time = Objects.requireNonNull(time);
+        this.metrics = Objects.requireNonNull(metrics);
+        this.metrics.registerEventQueueSizeGauge(accumulator::size);
         this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
             new EventProcessorThread(
                 threadPrefix + threadId
@@ -81,6 +113,9 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
      */
     private class EventProcessorThread extends Thread {
         private final Logger log;
+        private long pollStartMs;
+        private long timeSinceLastPollMs;
+        private long lastPollMs;
 
         EventProcessorThread(
             String name
@@ -92,11 +127,16 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
 
         private void handleEvents() {
             while (!shuttingDown) {
+                recordPollStartTime(time.milliseconds());
                 CoordinatorEvent event = accumulator.poll();
+                recordPollEndTime(time.milliseconds());
                 if (event != null) {
                     try {
                         log.debug("Executing event: {}.", event);
+                        long dequeuedTimeMs = time.milliseconds();
+                        metrics.recordEventQueueTime(dequeuedTimeMs - event.createdTimeMs());
                         event.run();
+                        metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs);
                     } catch (Throwable t) {
                         log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t);
                         event.complete(t);
@@ -112,6 +152,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
             while (event != null) {
                 try {
                     log.debug("Draining event: {}.", event);
+                    metrics.recordEventQueueTime(time.milliseconds() - event.createdTimeMs());
                     event.complete(new RejectedExecutionException("EventProcessor is closed."));
                 } catch (Throwable t) {
                     log.error("Failed to reject event {} due to: {}.", event, t.getMessage(), t);
@@ -145,6 +186,18 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
                 log.info("Shutdown completed");
             }
         }
+
+        private void recordPollStartTime(long pollStartMs) {
+            this.pollStartMs = pollStartMs;
+            this.timeSinceLastPollMs = lastPollMs != 0L ? pollStartMs - lastPollMs : 0;
+            this.lastPollMs = pollStartMs;
+        }
+
+        private void recordPollEndTime(long pollEndMs) {
+            long pollTimeMs = pollEndMs - pollStartMs;
+            double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs);
+            metrics.recordThreadIdleRatio(pollIdleRatio);
+        }
     }
 
     /**
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java
new file mode 100644
index 00000000000..f3726aafb17
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics.METRICS_GROUP;
+import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics.NUM_PARTITIONS_METRIC_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GroupCoordinatorRuntimeMetricsTest {
+    
+    @Test
+    public void testMetricNames() {
+        Metrics metrics = new Metrics();
+
+        HashSet<org.apache.kafka.common.MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
+            kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "loading"),
+            kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "active"),
+            kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "failed"),
+            metrics.metricName("event-queue-size", METRICS_GROUP),
+            metrics.metricName("partition-load-time-max", METRICS_GROUP),
+            metrics.metricName("partition-load-time-avg", METRICS_GROUP),
+            metrics.metricName("thread-idle-ratio-min", METRICS_GROUP),
+            metrics.metricName("thread-idle-ratio-avg", METRICS_GROUP)
+        ));
+
+        try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+            runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
+            expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName)));
+        }
+
+        expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName)));
+    }
+
+    @Test
+    public void testUpdateNumPartitionsMetrics() {
+        Metrics metrics = new Metrics();
+
+        try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+            IntStream.range(0, 10)
+                .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL, CoordinatorState.LOADING));
+            IntStream.range(0, 8)
+                .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING, CoordinatorState.ACTIVE));
+            IntStream.range(0, 8)
+                .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE, CoordinatorState.FAILED));
+            IntStream.range(0, 2)
+                .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.FAILED, CoordinatorState.CLOSED));
+
+            assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "loading"), 2);
+            assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "active"), 0);
+            assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "failed"), 6);
+        }
+    }
+
+    @Test
+    public void testPartitionLoadSensorMetrics() {
+        Time time = new MockTime();
+        Metrics metrics = new Metrics(time);
+
+        try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+            long startTimeMs = time.milliseconds();
+            runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
+            runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 2000);
+
+            org.apache.kafka.common.MetricName metricName = metrics.metricName(
+                "partition-load-time-avg", METRICS_GROUP);
+
+            KafkaMetric metric = metrics.metrics().get(metricName);
+            assertEquals(1500.0, metric.metricValue());
+
+            metricName = metrics.metricName(
+                "partition-load-time-max", METRICS_GROUP);
+            metric = metrics.metrics().get(metricName);
+            assertEquals(2000.0, metric.metricValue());
+        }
+    }
+
+    @Test
+    public void testThreadIdleRatioSensor() {
+        Time time = new MockTime();
+        Metrics metrics = new Metrics(time);
+
+        try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+            IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordThreadIdleRatio(1.0 / (i + 1)));
+
+            org.apache.kafka.common.MetricName metricName = metrics.metricName(
+                "thread-idle-ratio-avg", METRICS_GROUP);
+
+            KafkaMetric metric = metrics.metrics().get(metricName);
+            assertEquals((11.0 / 6.0) / 3.0, metric.metricValue()); // (6/6 + 3/6 + 2/6) / 3
+
+            metricName = metrics.metricName(
+                "thread-idle-ratio-min", METRICS_GROUP);
+            metric = metrics.metrics().get(metricName);
+            assertEquals(1.0 / 3.0, metric.metricValue());
+        }
+    }
+
+    @Test
+    public void testEventQueueSize() {
+        Time time = new MockTime();
+        Metrics metrics = new Metrics(time);
+
+        try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+            runtimeMetrics.registerEventQueueSizeGauge(() -> 5);
+            assertMetricGauge(metrics, kafkaMetricName(metrics, "event-queue-size"), 5);
+        }
+    }
+
+    private static void assertMetricGauge(Metrics metrics, org.apache.kafka.common.MetricName metricName, long count) {
+        assertEquals(count, (long) metrics.metric(metricName).metricValue());
+    }
+
+    private static com.yammer.metrics.core.MetricName yammerMetricName(String type, String name) {
+        String mBeanName = String.format("kafka.coordinator.group:type=%s,name=%s", type, name);
+        return new com.yammer.metrics.core.MetricName("kafka.coordinator.group", type, name, null, mBeanName);
+    }
+
+    private static MetricName kafkaMetricName(Metrics metrics, String name, String... keyValue) {
+        return metrics.metricName(name, METRICS_GROUP, "", keyValue);
+    }
+}
\ No newline at end of file
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index a7eb0f6b709..d776a07cd3a 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
@@ -45,6 +46,11 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.ACTIVE;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.CLOSED;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -118,9 +124,19 @@ public class CoordinatorRuntimeTest {
      * A CoordinatorLoader that always succeeds.
      */
     private static class MockCoordinatorLoader implements CoordinatorLoader<String> {
+        private final LoadSummary summary;
+
+        public MockCoordinatorLoader(LoadSummary summary) {
+            this.summary = summary;
+        }
+
+        public MockCoordinatorLoader() {
+            this(null);
+        }
+
         @Override
-        public CompletableFuture<Void> load(TopicPartition tp, CoordinatorPlayback<String> replayable) {
-            return CompletableFuture.completedFuture(null);
+        public CompletableFuture<LoadSummary> load(TopicPartition tp, CoordinatorPlayback<String> replayable) {
+            return CompletableFuture.completedFuture(summary);
         }
 
         @Override
@@ -271,6 +287,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -280,7 +297,7 @@ public class CoordinatorRuntimeTest {
         when(builder.withTopicPartition(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
         when(loader.load(TP, coordinator)).thenReturn(future);
 
         // Getting the coordinator context fails because the coordinator
@@ -294,13 +311,13 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
 
         // The coordinator is loading.
-        assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+        assertEquals(LOADING, ctx.state);
         assertEquals(0, ctx.epoch);
         assertEquals(coordinator, ctx.coordinator);
 
         // When the loading completes, the coordinator transitions to active.
         future.complete(null);
-        assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+        assertEquals(ACTIVE, ctx.state);
 
         // Verify that onLoaded is called.
         verify(coordinator, times(1)).onLoaded(MetadataImage.EMPTY);
@@ -335,6 +352,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -344,7 +362,7 @@ public class CoordinatorRuntimeTest {
         when(builder.withTopicPartition(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
         when(loader.load(TP, coordinator)).thenReturn(future);
 
         // Schedule the loading.
@@ -352,13 +370,13 @@ public class CoordinatorRuntimeTest {
 
         // Getting the context succeeds and the coordinator should be in loading.
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
-        assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+        assertEquals(LOADING, ctx.state);
         assertEquals(0, ctx.epoch);
         assertEquals(coordinator, ctx.coordinator);
 
         // When the loading fails, the coordinator transitions to failed.
         future.completeExceptionally(new Exception("failure"));
-        assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, ctx.state);
+        assertEquals(FAILED, ctx.state);
 
         // Verify that onUnloaded is called.
         verify(coordinator, times(1)).onUnloaded();
@@ -386,6 +404,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -395,7 +414,7 @@ public class CoordinatorRuntimeTest {
         when(builder.withTopicPartition(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
         when(loader.load(TP, coordinator)).thenReturn(future);
 
         // Schedule the loading.
@@ -403,19 +422,19 @@ public class CoordinatorRuntimeTest {
 
         // Getting the context succeeds and the coordinator should be in loading.
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
-        assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+        assertEquals(LOADING, ctx.state);
         assertEquals(10, ctx.epoch);
         assertEquals(coordinator, ctx.coordinator);
 
         // When the loading completes, the coordinator transitions to active.
         future.complete(null);
-        assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+        assertEquals(ACTIVE, ctx.state);
         assertEquals(10, ctx.epoch);
 
         // Loading with a previous epoch is a no-op. The coordinator stays
         // in active state with the correct epoch.
         runtime.scheduleLoadOperation(TP, 0);
-        assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+        assertEquals(ACTIVE, ctx.state);
         assertEquals(10, ctx.epoch);
     }
 
@@ -435,6 +454,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -444,7 +464,7 @@ public class CoordinatorRuntimeTest {
         when(builder.withTopicPartition(any())).thenReturn(builder);
         when(builder.build()).thenReturn(coordinator);
         when(supplier.get()).thenReturn(builder);
-        CompletableFuture<Void> future = new CompletableFuture<>();
+        CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
         when(loader.load(TP, coordinator)).thenReturn(future);
 
         // Schedule the loading.
@@ -452,13 +472,13 @@ public class CoordinatorRuntimeTest {
 
         // Getting the context succeeds and the coordinator should be in loading.
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
-        assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+        assertEquals(LOADING, ctx.state);
         assertEquals(10, ctx.epoch);
         assertEquals(coordinator, ctx.coordinator);
 
         // When the loading fails, the coordinator transitions to failed.
         future.completeExceptionally(new Exception("failure"));
-        assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, ctx.state);
+        assertEquals(FAILED, ctx.state);
 
         // Verify that onUnloaded is called.
         verify(coordinator, times(1)).onUnloaded();
@@ -474,7 +494,7 @@ public class CoordinatorRuntimeTest {
 
         // Getting the context succeeds and the coordinator should be in loading.
         ctx = runtime.contextOrThrow(TP);
-        assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+        assertEquals(LOADING, ctx.state);
         assertEquals(11, ctx.epoch);
         assertEquals(coordinator, ctx.coordinator);
 
@@ -482,7 +502,7 @@ public class CoordinatorRuntimeTest {
         future.complete(null);
 
         // Verify the state.
-        assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+        assertEquals(ACTIVE, ctx.state);
     }
 
     @Test
@@ -501,6 +521,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -514,12 +535,12 @@ public class CoordinatorRuntimeTest {
         // Loads the coordinator. It directly transitions to active.
         runtime.scheduleLoadOperation(TP, 10);
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
-        assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+        assertEquals(ACTIVE, ctx.state);
         assertEquals(10, ctx.epoch);
 
         // Schedule the unloading.
         runtime.scheduleUnloadOperation(TP, ctx.epoch + 1);
-        assertEquals(CoordinatorRuntime.CoordinatorState.CLOSED, ctx.state);
+        assertEquals(CLOSED, ctx.state);
 
         // Verify that onUnloaded is called.
         verify(coordinator, times(1)).onUnloaded();
@@ -549,6 +570,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -564,13 +586,13 @@ public class CoordinatorRuntimeTest {
         // Loads the coordinator. It directly transitions to active.
         runtime.scheduleLoadOperation(TP, 10);
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
-        assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+        assertEquals(ACTIVE, ctx.state);
         assertEquals(10, ctx.epoch);
 
         // Unloading with a previous epoch is a no-op. The coordinator stays
         // in active with the correct epoch.
         runtime.scheduleUnloadOperation(TP, 0);
-        assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+        assertEquals(ACTIVE, ctx.state);
         assertEquals(10, ctx.epoch);
     }
 
@@ -587,6 +609,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Schedule the loading.
@@ -695,6 +718,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Scheduling a write fails with a NotCoordinatorException because the coordinator
@@ -715,6 +739,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -739,6 +764,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -784,6 +810,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -831,6 +858,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -885,6 +913,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Schedule a read. It fails because the coordinator does not exist.
@@ -906,6 +935,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -947,6 +977,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -1012,6 +1043,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class);
@@ -1028,10 +1060,10 @@ public class CoordinatorRuntimeTest {
             .thenReturn(coordinator0)
             .thenReturn(coordinator1);
 
-        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        CompletableFuture<CoordinatorLoader.LoadSummary> future0 = new CompletableFuture<>();
         when(loader.load(tp0, coordinator0)).thenReturn(future0);
 
-        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        CompletableFuture<CoordinatorLoader.LoadSummary> future1 = new CompletableFuture<>();
         when(loader.load(tp1, coordinator1)).thenReturn(future1);
 
         runtime.scheduleLoadOperation(tp0, 0);
@@ -1067,6 +1099,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -1118,6 +1151,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(processor)
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -1189,6 +1223,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(processor)
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -1257,6 +1292,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -1313,6 +1349,7 @@ public class CoordinatorRuntimeTest {
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(new MockPartitionWriter())
                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
                 .build();
 
         // Loads the coordinator.
@@ -1340,4 +1377,119 @@ public class CoordinatorRuntimeTest {
         assertEquals(1, cnt.get());
         assertEquals(0, ctx.timer.size());
     }
+
+    @Test
+    public void testStateChanges() throws Exception {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+        GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withLoader(loader)
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
+                .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+        CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
+        when(loader.load(TP, coordinator)).thenReturn(future);
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 0);
+
+        // Getting the context succeeds and the coordinator should be in loading.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
+        assertEquals(LOADING, ctx.state);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, LOADING);
+
+        // When the loading fails, the coordinator transitions to failed.
+        future.completeExceptionally(new Exception("failure"));
+        assertEquals(FAILED, ctx.state);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, FAILED);
+
+        // Start loading a new topic partition.
+        TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+        future = new CompletableFuture<>();
+        when(loader.load(tp, coordinator)).thenReturn(future);
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(tp, 0);
+        // Getting the context succeeds and the coordinator should be in loading.
+        ctx = runtime.contextOrThrow(tp);
+        assertEquals(LOADING, ctx.state);
+        verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING);
+
+        // When the loading completes, the coordinator transitions to active.
+        future.complete(null);
+        assertEquals(ACTIVE, ctx.state);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, ACTIVE);
+
+        runtime.close();
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, CLOSED);
+        verify(runtimeMetrics, times(1)).recordPartitionStateChange(ACTIVE, CLOSED);
+    }
+
+    @Test
+    public void testPartitionLoadSensor() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
+        MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
+        MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+        GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
+
+        long startTimeMs = timer.time().milliseconds();
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withLoader(new MockCoordinatorLoader(
+                    new CoordinatorLoader.LoadSummary(
+                        startTimeMs,
+                        startTimeMs + 1000,
+                        30,
+                        3000)))
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(supplier)
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
+                .build();
+
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.withTime(any())).thenReturn(builder);
+        when(builder.withTimer(any())).thenReturn(builder);
+        when(builder.withTopicPartition(any())).thenReturn(builder);
+        when(builder.build()).thenReturn(coordinator);
+        when(supplier.get()).thenReturn(builder);
+
+        // Getting the coordinator context fails because the coordinator
+        // does not exist until scheduleLoadOperation is called.
+        assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 0);
+
+        // Getting the coordinator context succeeds now.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
+
+        // When the loading completes, the coordinator transitions to active.
+        assertEquals(ACTIVE, ctx.state);
+
+        verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
+    }
 }
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
index 0630c88f389..f6017b90bcd 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
@@ -18,16 +18,24 @@ package org.apache.kafka.coordinator.group.runtime;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.mockito.ArgumentCaptor;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
@@ -37,9 +45,63 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyDouble;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 @Timeout(value = 60)
 public class MultiThreadedEventProcessorTest {
+    private static class MockEventAccumulator<T> extends EventAccumulator<TopicPartition, CoordinatorEvent> {
+        private final Time time;
+        private final Queue<CoordinatorEvent> events;
+        private final long timeToPollMs;
+        private final AtomicBoolean isClosed;
+
+        public MockEventAccumulator(Time time, long timeToPollMs) {
+            this.time = time;
+            this.events = new LinkedList<>();
+            this.timeToPollMs = timeToPollMs;
+            this.isClosed = new AtomicBoolean(false);
+        }
+
+        @Override
+        public CoordinatorEvent poll() {
+            synchronized (events) {
+                while (events.isEmpty() && !isClosed.get()) {
+                    try {
+                        events.wait();
+                    } catch (Exception ignored) {
+                        
+                    }
+                }
+                time.sleep(timeToPollMs);
+                return events.poll();
+            }
+        }
+
+        @Override
+        public CoordinatorEvent poll(long timeout, TimeUnit unit) {
+            return null;
+        }
+
+        @Override
+        public void add(CoordinatorEvent event) throws RejectedExecutionException {
+            synchronized (events) {
+                events.add(event);
+                events.notifyAll();
+            }
+        }
+
+        @Override
+        public void close() {
+            isClosed.set(true);
+            synchronized (events) {
+                events.notifyAll();
+            }
+        }
+    }
 
     private static class FutureEvent<T> implements CoordinatorEvent {
         private final TopicPartition key;
@@ -48,18 +110,28 @@ public class MultiThreadedEventProcessorTest {
         private final boolean block;
         private final CountDownLatch latch;
         private final CountDownLatch executed;
+        private long createdTimeMs;
 
         FutureEvent(
             TopicPartition key,
             Supplier<T> supplier
         ) {
-            this(key, supplier, false);
+            this(key, supplier, false, 0L);
         }
 
         FutureEvent(
             TopicPartition key,
             Supplier<T> supplier,
             boolean block
+        ) {
+            this(key, supplier, block, 0L);
+        }
+
+        FutureEvent(
+            TopicPartition key,
+            Supplier<T> supplier,
+            boolean block,
+            long createdTimeMs
         ) {
             this.key = key;
             this.future = new CompletableFuture<>();
@@ -67,6 +139,7 @@ public class MultiThreadedEventProcessorTest {
             this.block = block;
             this.latch = new CountDownLatch(1);
             this.executed = new CountDownLatch(1);
+            this.createdTimeMs = createdTimeMs;
         }
 
         @Override
@@ -90,6 +163,11 @@ public class MultiThreadedEventProcessorTest {
             future.completeExceptionally(ex);
         }
 
+        @Override
+        public long createdTimeMs() {
+            return createdTimeMs;
+        }
+
         @Override
         public TopicPartition key() {
             return key;
@@ -118,7 +196,9 @@ public class MultiThreadedEventProcessorTest {
         CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
             new LogContext(),
             "event-processor-",
-            2
+            2,
+            Time.SYSTEM,
+            mock(GroupCoordinatorRuntimeMetrics.class)
         );
         eventProcessor.close();
     }
@@ -128,7 +208,9 @@ public class MultiThreadedEventProcessorTest {
         try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
             new LogContext(),
             "event-processor-",
-            2
+            2,
+            Time.SYSTEM,
+            mock(GroupCoordinatorRuntimeMetrics.class)
         )) {
             AtomicInteger numEventsExecuted = new AtomicInteger(0);
 
@@ -163,7 +245,9 @@ public class MultiThreadedEventProcessorTest {
         try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
             new LogContext(),
             "event-processor-",
-            2
+            2,
+            Time.SYSTEM,
+            mock(GroupCoordinatorRuntimeMetrics.class)
         )) {
             AtomicInteger numEventsExecuted = new AtomicInteger(0);
 
@@ -246,7 +330,9 @@ public class MultiThreadedEventProcessorTest {
         CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
             new LogContext(),
             "event-processor-",
-            2
+            2,
+            Time.SYSTEM,
+            mock(GroupCoordinatorRuntimeMetrics.class)
         );
 
         eventProcessor.close();
@@ -260,7 +346,9 @@ public class MultiThreadedEventProcessorTest {
         try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(
             new LogContext(),
             "event-processor-",
-            1 // Use a single thread to block event in the processor.
+            1, // Use a single thread to block event in the processor.
+            Time.SYSTEM,
+            mock(GroupCoordinatorRuntimeMetrics.class)
         )) {
             AtomicInteger numEventsExecuted = new AtomicInteger(0);
 
@@ -317,4 +405,147 @@ public class MultiThreadedEventProcessorTest {
             assertEquals(1, numEventsExecuted.get());
         }
     }
+
+    @Test
+    public void testMetrics() throws Exception {
+        GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
+        Time mockTime = new MockTime();
+        AtomicInteger numEventsExecuted = new AtomicInteger(0);
+
+        // Special event which blocks until the latch is released.
+        FutureEvent<Integer> blockingEvent = new FutureEvent<>(
+            new TopicPartition("foo", 0), () -> {
+                mockTime.sleep(4000L);
+                return numEventsExecuted.incrementAndGet();
+            },
+            true,
+            mockTime.milliseconds()
+        );
+
+        try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(
+            new LogContext(),
+            "event-processor-",
+            1, // Use a single thread to block event in the processor.
+            mockTime,
+            mockRuntimeMetrics,
+            new MockEventAccumulator<>(mockTime, 500L)
+        )) {
+            // Enqueue the blocking event.
+            eventProcessor.enqueue(blockingEvent);
+
+            // Ensure that the blocking event is executed.
+            waitForCondition(() -> numEventsExecuted.get() > 0,
+                "Blocking event not executed.");
+
+            // Enqueue the other event.
+            FutureEvent<Integer> otherEvent = new FutureEvent<>(
+                new TopicPartition("foo", 0), () -> {
+                mockTime.sleep(5000L);
+                return numEventsExecuted.incrementAndGet();
+            },
+                false,
+                mockTime.milliseconds()
+            );
+
+            eventProcessor.enqueue(otherEvent);
+
+            // Pass the time.
+            mockTime.sleep(3000L);
+
+            // Events should not be completed.
+            assertFalse(otherEvent.future.isDone());
+
+            // Release the blocking event to unblock the thread.
+            blockingEvent.release();
+
+            // The blocking event should be completed.
+            blockingEvent.future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS);
+            assertTrue(blockingEvent.future.isDone());
+            assertFalse(blockingEvent.future.isCompletedExceptionally());
+
+            // The other event should also be completed.
+            otherEvent.future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS);
+            assertTrue(otherEvent.future.isDone());
+            assertFalse(otherEvent.future.isCompletedExceptionally());
+            assertEquals(2, numEventsExecuted.get());
+
+            // e1 poll time = 500
+            // e1 processing time = 4000
+            // e2 enqueue time = 3000
+            // e2 poll time = 500
+            // e2 processing time = 5000
+
+            // e1 poll time / e1 poll time
+            verify(mockRuntimeMetrics, times(1)).recordThreadIdleRatio(1.0);
+            // e1 poll time
+            verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(500L);
+            // e1 processing time + e2 enqueue time
+            verify(mockRuntimeMetrics, times(1)).recordEventQueueProcessingTime(7000L);
+
+            // Second event (e2)
+
+            // idle ratio = e2 poll time / (e1 poll time + e1 processing time + e2 enqueue time + e2 poll time)
+            verify(mockRuntimeMetrics, times(1)).recordThreadIdleRatio(500.0 / (500.0 + 7000.0 + 500.0));
+            // event queue time = e2 enqueue time + e2 poll time
+            verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(3500L);
+            // e2 processing time
+            verify(mockRuntimeMetrics, times(1)).recordEventQueueProcessingTime(5000L);
+        }
+    }
+
+    @Test
+    public void testRecordThreadIdleRatioTwoThreads() throws Exception {
+        GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
+
+        try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
+            new LogContext(),
+            "event-processor-",
+            2,
+            Time.SYSTEM,
+            mockRuntimeMetrics,
+            new MockEventAccumulator<>(Time.SYSTEM, 100L)
+        )) {
+            List<Double> recordedRatios = new ArrayList<>();
+            AtomicInteger numEventsExecuted = new AtomicInteger(0);
+            ArgumentCaptor<Double> ratioCaptured = ArgumentCaptor.forClass(Double.class);
+            doAnswer(invocation -> {
+                double threadIdleRatio = ratioCaptured.getValue();
+                assertTrue(threadIdleRatio > 0.0);
+                synchronized (recordedRatios) {
+                    recordedRatios.add(threadIdleRatio);
+                }
+                return null;
+            }).when(mockRuntimeMetrics).recordThreadIdleRatio(ratioCaptured.capture());
+
+            List<FutureEvent<Integer>> events = Arrays.asList(
+                new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet),
+                new FutureEvent<>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet),
+                new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet),
+                new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet),
+                new FutureEvent<>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet),
+                new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet),
+                new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet)
+            );
+
+            events.forEach(eventProcessor::enqueue);
+
+            CompletableFuture.allOf(events
+                .stream()
+                .map(FutureEvent::future)
+                .toArray(CompletableFuture[]::new)
+            ).get(10, TimeUnit.SECONDS);
+
+            events.forEach(event -> {
+                assertTrue(event.future.isDone());
+                assertFalse(event.future.isCompletedExceptionally());
+            });
+
+            assertEquals(events.size(), numEventsExecuted.get());
+            verify(mockRuntimeMetrics, times(7)).recordThreadIdleRatio(anyDouble());
+
+            assertEquals(7, recordedRatios.size());
+            double average = recordedRatios.stream().mapToDouble(Double::doubleValue).sum() / 7;
+            assertTrue(average > 0.0 && average < 1.0);
+        }
+    }
 }