You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/14 15:41:12 UTC

[kafka] branch trunk updated: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer (#13991)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32ff347b2c0 KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer (#13991)
32ff347b2c0 is described below

commit 32ff347b2c0ca21b9c567ab4cfe54869d7148e28
Author: David Jacot <dj...@confluent.io>
AuthorDate: Fri Jul 14 17:41:06 2023 +0200

    KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer (#13991)
    
    This patch wires the new group coordinator in BrokerServer (KRaft only). With this, it is now possible to run a cluster with the new group coordinator and to use the ConsumerGroupHeartbeat API by specifying the following two properties:
    - group.coordinator.new.enable = true (to enable the new group coordinator)
    - unstable.api.versions.enable = true (to enable unreleased APIs)
    
    Note that the new group coordinator does not support all the existing APIs yet.
    
    Reviewers: Jeff Kim <je...@confluent.io>, Justine Olshan <jo...@confluent.io>
---
 checkstyle/import-control-server-common.xml        |  2 +
 .../src/main/scala/kafka/server/BrokerServer.scala | 62 ++++++++++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  5 +-
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 92 ++++++++++++++++++++--
 .../coordinator/group/GroupCoordinatorService.java |  6 +-
 .../coordinator/group/GroupMetadataManager.java    | 30 +++----
 .../group/runtime/CoordinatorRuntime.java          |  2 +-
 .../coordinator/group/util/SystemTimerReaper.java  | 81 +++++++++++++++++++
 .../group/GroupMetadataManagerTest.java            | 13 +++
 .../group/util/SystemTimerReaperTest.java          | 65 +++++++++++++++
 10 files changed, 322 insertions(+), 36 deletions(-)

diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml
index 2e65ec601a3..8e96ea9390c 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -92,6 +92,8 @@
 
             <subpackage name="timer">
                 <allow class="org.apache.kafka.server.util.MockTime" />
+                <allow class="org.apache.kafka.server.util.ShutdownableThread" />
+                <allow class="org.apache.kafka.test.TestUtils" />
             </subpackage>
         </subpackage>
     </subpackage>
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index edb645f561c..8641a74824b 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.cluster.EndPoint
-import kafka.coordinator.group.GroupCoordinatorAdapter
+import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter, GroupCoordinatorAdapter}
 import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
 import kafka.log.LogManager
 import kafka.log.remote.RemoteLogManager
@@ -38,7 +38,9 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException, TopicPartition}
-import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.coordinator.group
+import org.apache.kafka.coordinator.group.util.SystemTimerReaper
+import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
@@ -48,6 +50,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
+import org.apache.kafka.server.util.timer.SystemTimer
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 
@@ -284,14 +287,7 @@ class BrokerServer(
       tokenManager = new DelegationTokenManager(config, tokenCache, time , null)
       tokenManager.startup() // does nothing, we just need a token manager in order to compile right now...
 
-      // Create group coordinator, but don't start it until we've started replica manager.
-      // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
-      groupCoordinator = GroupCoordinatorAdapter(
-        config,
-        replicaManager,
-        Time.SYSTEM,
-        metrics
-      )
+      groupCoordinator = createGroupCoordinator()
 
       val producerIdManagerSupplier = () => ProducerIdManager.rpc(
         config.brokerId,
@@ -522,6 +518,52 @@ class BrokerServer(
     }
   }
 
+  private def createGroupCoordinator(): GroupCoordinator = {
+    // Create group coordinator, but don't start it until we've started replica manager.
+    // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good
+    // to fix the underlying issue.
+    if (config.isNewGroupCoordinatorEnabled) {
+      val time = Time.SYSTEM
+      val serde = new RecordSerde
+      val groupCoordinatorConfig = new GroupCoordinatorConfig(
+        config.groupCoordinatorNumThreads,
+        config.consumerGroupSessionTimeoutMs,
+        config.consumerGroupHeartbeatIntervalMs,
+        config.consumerGroupMaxSize,
+        config.consumerGroupAssignors,
+        config.offsetsTopicSegmentBytes
+      )
+      val timer = new SystemTimerReaper(
+        "group-coordinator-reaper",
+        new SystemTimer("group-coordinator")
+      )
+      val loader = new CoordinatorLoaderImpl[group.Record](
+        replicaManager,
+        serde,
+        config.offsetsLoadBufferSize
+      )
+      val writer = new CoordinatorPartitionWriter[group.Record](
+        replicaManager,
+        serde,
+        config.offsetsTopicCompressionType,
+        time
+      )
+      new GroupCoordinatorService.Builder(config.brokerId, groupCoordinatorConfig)
+        .withTime(time)
+        .withTimer(timer)
+        .withLoader(loader)
+        .withWriter(writer)
+        .build()
+    } else {
+      GroupCoordinatorAdapter(
+        config,
+        replicaManager,
+        Time.SYSTEM,
+        metrics
+      )
+    }
+  }
+
   protected def createRemoteLogManager(): Option[RemoteLogManager] = {
     if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
       if (config.logDirs.size > 1) {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 752b76cb7b1..2214be13d03 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -42,6 +42,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.coordinator.group.assignor.{PartitionAssignor, RangeAssignor}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
@@ -175,7 +176,7 @@ object Defaults {
   val ConsumerGroupMinHeartbeatIntervalMs = 5000
   val ConsumerGroupMaxHeartbeatIntervalMs = 15000
   val ConsumerGroupMaxSize = Int.MaxValue
-  val ConsumerGroupAssignors = ""
+  val ConsumerGroupAssignors = List(classOf[RangeAssignor].getName).asJava
 
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize
@@ -1934,7 +1935,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   val consumerGroupMinHeartbeatIntervalMs = getInt(KafkaConfig.ConsumerGroupMinHeartbeatIntervalMsProp)
   val consumerGroupMaxHeartbeatIntervalMs = getInt(KafkaConfig.ConsumerGroupMaxHeartbeatIntervalMsProp)
   val consumerGroupMaxSize = getInt(KafkaConfig.ConsumerGroupMaxSizeProp)
-  val consumerGroupAssignors = getList(KafkaConfig.ConsumerGroupAssignorsProp)
+  val consumerGroupAssignors = getConfiguredInstances(KafkaConfig.ConsumerGroupAssignorsProp, classOf[PartitionAssignor])
 
   /** ********* Offset management configuration ***********/
   val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 60f8d75157c..7982fa00885 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -19,15 +19,19 @@ package kafka.server
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.utils.TestUtils
 import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertThrows}
 import org.junit.jupiter.api.Tag
 import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.api.extension.ExtendWith
 
 import java.io.EOFException
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
     assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
     new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"),
-    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true")
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
   ))
   def testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit = {
-    val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+    val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+    val admin = cluster.createAdminClient()
+
+    // Creates the __consumer_offsets topics because it won't be created automatically
+    // in this test because it does not use FindCoordinator API.
+    TestUtils.createOffsetsTopicWithAdmin(
+      admin = admin,
+      brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala
+    )
+
+    // Heartbeat request to join the group. Note that the member subscribes
+    // to an nonexistent topic.
+    var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
       new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
     ).build()
 
-    val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
-    val expectedResponse = new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-    assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
+
+    // Verify the response.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
+
+    // Create the topic.
+    val topicId = TestUtils.createTopicWithAdminRaw(
+      admin = admin,
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Prepare the next heartbeat.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+        .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+    ).build()
+
+    // This is the expected assignment.
+    val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
+      .setAssignedTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+    // Heartbeats until the partitions are assigned.
+    consumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+        consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+    }, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
+
+    // Verify the response.
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
+
+    // Leave the group.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+        .setMemberEpoch(-1)
+    ).build()
+
+    consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
+
+    // Verify the response.
+    assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
   private def connectAndReceive(request: ConsumerGroupHeartbeatRequest): ConsumerGroupHeartbeatResponse = {
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index fd3969b3957..324a3e8a057 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -128,8 +128,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
             if (time == null)
                 throw new IllegalArgumentException("Time must be set.");
 
-            String logPrefix = String.format("GroupCoordinator id=%d ", nodeId);
-            LogContext logContext = new LogContext(String.format("[%s ]", logPrefix));
+            String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
+            LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
 
             CoordinatorBuilderSupplier<ReplicatedGroupCoordinator, Record> supplier = () ->
                 new ReplicatedGroupCoordinator.Builder(config);
@@ -142,6 +142,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
 
             CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
                 new CoordinatorRuntime.Builder<ReplicatedGroupCoordinator, Record>()
+                    .withTime(time)
+                    .withTimer(timer)
                     .withLogPrefix(logPrefix)
                     .withLogContext(logContext)
                     .withEventProcessor(processor)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 1ea0aeb78ec..c0804ff6bdc 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1187,20 +1187,22 @@ public class GroupMetadataManager {
 
         // Notify all the groups subscribed to the created, updated or
         // deleted topics.
-        Set<String> allGroupIds = new HashSet<>();
-        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
-            String topicName = topicDelta.name();
-            allGroupIds.addAll(groupsSubscribedToTopic(topicName));
-        });
-        delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
-            TopicImage topicImage = delta.image().topics().getTopic(topicId);
-            allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
-        });
-        allGroupIds.forEach(groupId -> {
-            Group group = groups.get(groupId);
-            if (group != null && group.type() == Group.GroupType.CONSUMER) {
-                ((ConsumerGroup) group).requestMetadataRefresh();
-            }
+        Optional.ofNullable(delta.topicsDelta()).ifPresent(topicsDelta -> {
+            Set<String> allGroupIds = new HashSet<>();
+            topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
+                String topicName = topicDelta.name();
+                allGroupIds.addAll(groupsSubscribedToTopic(topicName));
+            });
+            topicsDelta.deletedTopicIds().forEach(topicId -> {
+                TopicImage topicImage = delta.image().topics().getTopic(topicId);
+                allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
+            });
+            allGroupIds.forEach(groupId -> {
+                Group group = groups.get(groupId);
+                if (group != null && group.type() == Group.GroupType.CONSUMER) {
+                    ((ConsumerGroup) group).requestMetadataRefresh();
+                }
+            });
         });
     }
 
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 9b5b32f7741..81598893196 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -886,7 +886,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
         @Override
         public void complete(Throwable exception) {
             if (exception != null) {
-                log.error("Execution of {} failed due to {}.", name, exception);
+                log.error("Execution of {} failed due to {}.", name, exception.getMessage());
             }
         }
 
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java
new file mode 100644
index 00000000000..0b8eb6a2f71
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+    private static final long WORK_TIMEOUT_MS = 200L;
+
+    class Reaper extends ShutdownableThread {
+        Reaper(String name) {
+            super(name, false);
+        }
+
+        @Override
+        public void doWork() {
+            try {
+                timer.advanceClock(WORK_TIMEOUT_MS);
+            } catch (InterruptedException ex) {
+                // Ignore.
+            }
+        }
+    }
+
+    private final Timer timer;
+    private final Reaper reaper;
+
+    public SystemTimerReaper(String reaperThreadName, Timer timer) {
+        this.timer = timer;
+        this.reaper = new Reaper(reaperThreadName);
+        this.reaper.start();
+    }
+
+    @Override
+    public void add(TimerTask timerTask) {
+        timer.add(timerTask);
+    }
+
+    @Override
+    public boolean advanceClock(long timeoutMs) throws InterruptedException {
+        return timer.advanceClock(timeoutMs);
+    }
+
+    @Override
+    public int size() {
+        return timer.size();
+    }
+
+    @Override
+    public void close() throws Exception {
+        reaper.initiateShutdown();
+        // Improve shutdown time by waking up the reaper thread
+        // blocked on poll by sending a no-op.
+        timer.add(new TimerTask(0) {
+            @Override
+            public void run() {}
+        });
+        reaper.awaitShutdown();
+        timer.close();
+    }
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4d192c7fe81..12d53c92efa 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -2360,6 +2360,19 @@ public class GroupMetadataManagerTest {
         assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
     }
 
+    @Test
+    public void testOnNewMetadataImageWithEmptyDelta() {
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(new MockPartitionAssignor("range")))
+            .build();
+
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
+
+        context.groupMetadataManager.onNewMetadataImage(image, delta);
+        assertEquals(image, context.groupMetadataManager.image());
+    }
+
     @Test
     public void testOnNewMetadataImage() {
         GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java
new file mode 100644
index 00000000000..4c2a5c5f79a
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class SystemTimerReaperTest {
+    private static class FutureTimerTask<T> extends TimerTask {
+        CompletableFuture<T> future = new CompletableFuture<>();
+
+        FutureTimerTask(long delayMs) {
+            super(delayMs);
+        }
+
+        @Override
+        public void run() {
+            // We use org.apache.kafka.common.errors.TimeoutException to differentiate
+            // from java.util.concurrent.TimeoutException.
+            future.completeExceptionally(new TimeoutException(
+                String.format("Future failed to be completed before timeout of %sMs ms was reached", delayMs)));
+        }
+    }
+
+    private <T> CompletableFuture<T> add(Timer timer, long delayMs) {
+        FutureTimerTask<T> task = new FutureTimerTask<>(delayMs);
+        timer.add(task);
+        return task.future;
+    }
+
+    @Test
+    public void testReaper() throws Exception {
+        Timer timer = new SystemTimerReaper("reaper", new SystemTimer("timer"));
+        try {
+            CompletableFuture<Void> t1 = add(timer, 100L);
+            CompletableFuture<Void> t2 = add(timer, 200L);
+            CompletableFuture<Void> t3 = add(timer, 300L);
+            TestUtils.assertFutureThrows(t1, TimeoutException.class);
+            TestUtils.assertFutureThrows(t2, TimeoutException.class);
+            TestUtils.assertFutureThrows(t3, TimeoutException.class);
+        } finally {
+            timer.close();
+        }
+    }
+}