You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/07/14 15:41:12 UTC
[kafka] branch trunk updated: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer (#13991)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32ff347b2c0 KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer (#13991)
32ff347b2c0 is described below
commit 32ff347b2c0ca21b9c567ab4cfe54869d7148e28
Author: David Jacot <dj...@confluent.io>
AuthorDate: Fri Jul 14 17:41:06 2023 +0200
KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer (#13991)
This patch wires the new group coordinator in BrokerServer (KRaft only). With this, it is now possible to run a cluster with the new group coordinator and to use the ConsumerGroupHeartbeat API by specifying the following two properties:
- group.coordinator.new.enable = true (to enable the new group coordinator)
- unstable.api.versions.enable = true (to enable unreleased APIs)
Note that the new group coordinator does not support all the existing APIs yet.
Reviewers: Jeff Kim <je...@confluent.io>, Justine Olshan <jo...@confluent.io>
---
checkstyle/import-control-server-common.xml | 2 +
.../src/main/scala/kafka/server/BrokerServer.scala | 62 ++++++++++++---
core/src/main/scala/kafka/server/KafkaConfig.scala | 5 +-
.../server/ConsumerGroupHeartbeatRequestTest.scala | 92 ++++++++++++++++++++--
.../coordinator/group/GroupCoordinatorService.java | 6 +-
.../coordinator/group/GroupMetadataManager.java | 30 +++----
.../group/runtime/CoordinatorRuntime.java | 2 +-
.../coordinator/group/util/SystemTimerReaper.java | 81 +++++++++++++++++++
.../group/GroupMetadataManagerTest.java | 13 +++
.../group/util/SystemTimerReaperTest.java | 65 +++++++++++++++
10 files changed, 322 insertions(+), 36 deletions(-)
diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml
index 2e65ec601a3..8e96ea9390c 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -92,6 +92,8 @@
<subpackage name="timer">
<allow class="org.apache.kafka.server.util.MockTime" />
+ <allow class="org.apache.kafka.server.util.ShutdownableThread" />
+ <allow class="org.apache.kafka.test.TestUtils" />
</subpackage>
</subpackage>
</subpackage>
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index edb645f561c..8641a74824b 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -18,7 +18,7 @@
package kafka.server
import kafka.cluster.EndPoint
-import kafka.coordinator.group.GroupCoordinatorAdapter
+import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter, GroupCoordinatorAdapter}
import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
@@ -38,7 +38,9 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException, TopicPartition}
-import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.coordinator.group
+import org.apache.kafka.coordinator.group.util.SystemTimerReaper
+import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.{BrokerState, VersionRange}
@@ -48,6 +50,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
+import org.apache.kafka.server.util.timer.SystemTimer
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
@@ -284,14 +287,7 @@ class BrokerServer(
tokenManager = new DelegationTokenManager(config, tokenCache, time , null)
tokenManager.startup() // does nothing, we just need a token manager in order to compile right now...
- // Create group coordinator, but don't start it until we've started replica manager.
- // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
- groupCoordinator = GroupCoordinatorAdapter(
- config,
- replicaManager,
- Time.SYSTEM,
- metrics
- )
+ groupCoordinator = createGroupCoordinator()
val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
@@ -522,6 +518,52 @@ class BrokerServer(
}
}
+ private def createGroupCoordinator(): GroupCoordinator = {
+ // Create group coordinator, but don't start it until we've started replica manager.
+ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good
+ // to fix the underlying issue.
+ if (config.isNewGroupCoordinatorEnabled) {
+ val time = Time.SYSTEM
+ val serde = new RecordSerde
+ val groupCoordinatorConfig = new GroupCoordinatorConfig(
+ config.groupCoordinatorNumThreads,
+ config.consumerGroupSessionTimeoutMs,
+ config.consumerGroupHeartbeatIntervalMs,
+ config.consumerGroupMaxSize,
+ config.consumerGroupAssignors,
+ config.offsetsTopicSegmentBytes
+ )
+ val timer = new SystemTimerReaper(
+ "group-coordinator-reaper",
+ new SystemTimer("group-coordinator")
+ )
+ val loader = new CoordinatorLoaderImpl[group.Record](
+ replicaManager,
+ serde,
+ config.offsetsLoadBufferSize
+ )
+ val writer = new CoordinatorPartitionWriter[group.Record](
+ replicaManager,
+ serde,
+ config.offsetsTopicCompressionType,
+ time
+ )
+ new GroupCoordinatorService.Builder(config.brokerId, groupCoordinatorConfig)
+ .withTime(time)
+ .withTimer(timer)
+ .withLoader(loader)
+ .withWriter(writer)
+ .build()
+ } else {
+ GroupCoordinatorAdapter(
+ config,
+ replicaManager,
+ Time.SYSTEM,
+ metrics
+ )
+ }
+ }
+
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
if (config.logDirs.size > 1) {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 752b76cb7b1..2214be13d03 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -42,6 +42,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.coordinator.group.assignor.{PartitionAssignor, RangeAssignor}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
@@ -175,7 +176,7 @@ object Defaults {
val ConsumerGroupMinHeartbeatIntervalMs = 5000
val ConsumerGroupMaxHeartbeatIntervalMs = 15000
val ConsumerGroupMaxSize = Int.MaxValue
- val ConsumerGroupAssignors = ""
+ val ConsumerGroupAssignors = List(classOf[RangeAssignor].getName).asJava
/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize
@@ -1934,7 +1935,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val consumerGroupMinHeartbeatIntervalMs = getInt(KafkaConfig.ConsumerGroupMinHeartbeatIntervalMsProp)
val consumerGroupMaxHeartbeatIntervalMs = getInt(KafkaConfig.ConsumerGroupMaxHeartbeatIntervalMsProp)
val consumerGroupMaxSize = getInt(KafkaConfig.ConsumerGroupMaxSizeProp)
- val consumerGroupAssignors = getList(KafkaConfig.ConsumerGroupAssignorsProp)
+ val consumerGroupAssignors = getConfiguredInstances(KafkaConfig.ConsumerGroupAssignorsProp, classOf[PartitionAssignor])
/** ********* Offset management configuration ***********/
val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 60f8d75157c..7982fa00885 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -19,15 +19,19 @@ package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.utils.TestUtils
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertThrows}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import java.io.EOFException
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
}
- @ClusterTest(serverProperties = Array(
+ @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"),
- new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true")
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
def testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit = {
- val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+
+ // Creates the __consumer_offsets topics because it won't be created automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
).build()
- val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
- val expectedResponse = new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
- assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
+
+ // Create the topic.
+ val topicId = TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Prepare the next heartbeat.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // This is the expected assignment.
+ val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setAssignedTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ // Heartbeats until the partitions are assigned.
+ consumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
+
+ // Leave the group.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
+ .setMemberEpoch(-1)
+ ).build()
+
+ consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
+
+ // Verify the response.
+ assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
}
private def connectAndReceive(request: ConsumerGroupHeartbeatRequest): ConsumerGroupHeartbeatResponse = {
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index fd3969b3957..324a3e8a057 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -128,8 +128,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
if (time == null)
throw new IllegalArgumentException("Time must be set.");
- String logPrefix = String.format("GroupCoordinator id=%d ", nodeId);
- LogContext logContext = new LogContext(String.format("[%s ]", logPrefix));
+ String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
+ LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
CoordinatorBuilderSupplier<ReplicatedGroupCoordinator, Record> supplier = () ->
new ReplicatedGroupCoordinator.Builder(config);
@@ -142,6 +142,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
new CoordinatorRuntime.Builder<ReplicatedGroupCoordinator, Record>()
+ .withTime(time)
+ .withTimer(timer)
.withLogPrefix(logPrefix)
.withLogContext(logContext)
.withEventProcessor(processor)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 1ea0aeb78ec..c0804ff6bdc 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1187,20 +1187,22 @@ public class GroupMetadataManager {
// Notify all the groups subscribed to the created, updated or
// deleted topics.
- Set<String> allGroupIds = new HashSet<>();
- delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
- String topicName = topicDelta.name();
- allGroupIds.addAll(groupsSubscribedToTopic(topicName));
- });
- delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
- TopicImage topicImage = delta.image().topics().getTopic(topicId);
- allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
- });
- allGroupIds.forEach(groupId -> {
- Group group = groups.get(groupId);
- if (group != null && group.type() == Group.GroupType.CONSUMER) {
- ((ConsumerGroup) group).requestMetadataRefresh();
- }
+ Optional.ofNullable(delta.topicsDelta()).ifPresent(topicsDelta -> {
+ Set<String> allGroupIds = new HashSet<>();
+ topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
+ String topicName = topicDelta.name();
+ allGroupIds.addAll(groupsSubscribedToTopic(topicName));
+ });
+ topicsDelta.deletedTopicIds().forEach(topicId -> {
+ TopicImage topicImage = delta.image().topics().getTopic(topicId);
+ allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
+ });
+ allGroupIds.forEach(groupId -> {
+ Group group = groups.get(groupId);
+ if (group != null && group.type() == Group.GroupType.CONSUMER) {
+ ((ConsumerGroup) group).requestMetadataRefresh();
+ }
+ });
});
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 9b5b32f7741..81598893196 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -886,7 +886,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
@Override
public void complete(Throwable exception) {
if (exception != null) {
- log.error("Execution of {} failed due to {}.", name, exception);
+ log.error("Execution of {} failed due to {}.", name, exception.getMessage());
}
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java
new file mode 100644
index 00000000000..0b8eb6a2f71
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+ private static final long WORK_TIMEOUT_MS = 200L;
+
+ class Reaper extends ShutdownableThread {
+ Reaper(String name) {
+ super(name, false);
+ }
+
+ @Override
+ public void doWork() {
+ try {
+ timer.advanceClock(WORK_TIMEOUT_MS);
+ } catch (InterruptedException ex) {
+ // Ignore.
+ }
+ }
+ }
+
+ private final Timer timer;
+ private final Reaper reaper;
+
+ public SystemTimerReaper(String reaperThreadName, Timer timer) {
+ this.timer = timer;
+ this.reaper = new Reaper(reaperThreadName);
+ this.reaper.start();
+ }
+
+ @Override
+ public void add(TimerTask timerTask) {
+ timer.add(timerTask);
+ }
+
+ @Override
+ public boolean advanceClock(long timeoutMs) throws InterruptedException {
+ return timer.advanceClock(timeoutMs);
+ }
+
+ @Override
+ public int size() {
+ return timer.size();
+ }
+
+ @Override
+ public void close() throws Exception {
+ reaper.initiateShutdown();
+ // Improve shutdown time by waking up the reaper thread
+ // blocked on poll by sending a no-op.
+ timer.add(new TimerTask(0) {
+ @Override
+ public void run() {}
+ });
+ reaper.awaitShutdown();
+ timer.close();
+ }
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4d192c7fe81..12d53c92efa 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -2360,6 +2360,19 @@ public class GroupMetadataManagerTest {
assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
}
+ @Test
+ public void testOnNewMetadataImageWithEmptyDelta() {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(new MockPartitionAssignor("range")))
+ .build();
+
+ MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
+
+ context.groupMetadataManager.onNewMetadataImage(image, delta);
+ assertEquals(image, context.groupMetadataManager.image());
+ }
+
@Test
public void testOnNewMetadataImage() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java
new file mode 100644
index 00000000000..4c2a5c5f79a
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class SystemTimerReaperTest {
+ private static class FutureTimerTask<T> extends TimerTask {
+ CompletableFuture<T> future = new CompletableFuture<>();
+
+ FutureTimerTask(long delayMs) {
+ super(delayMs);
+ }
+
+ @Override
+ public void run() {
+ // We use org.apache.kafka.common.errors.TimeoutException to differentiate
+ // from java.util.concurrent.TimeoutException.
+ future.completeExceptionally(new TimeoutException(
+ String.format("Future failed to be completed before timeout of %sMs ms was reached", delayMs)));
+ }
+ }
+
+ private <T> CompletableFuture<T> add(Timer timer, long delayMs) {
+ FutureTimerTask<T> task = new FutureTimerTask<>(delayMs);
+ timer.add(task);
+ return task.future;
+ }
+
+ @Test
+ public void testReaper() throws Exception {
+ Timer timer = new SystemTimerReaper("reaper", new SystemTimer("timer"));
+ try {
+ CompletableFuture<Void> t1 = add(timer, 100L);
+ CompletableFuture<Void> t2 = add(timer, 200L);
+ CompletableFuture<Void> t3 = add(timer, 300L);
+ TestUtils.assertFutureThrows(t1, TimeoutException.class);
+ TestUtils.assertFutureThrows(t2, TimeoutException.class);
+ TestUtils.assertFutureThrows(t3, TimeoutException.class);
+ } finally {
+ timer.close();
+ }
+ }
+}