You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/05/31 22:00:10 UTC
[kafka] branch trunk updated: KAFKA-10060 GroupMetadataManager
should not log if there are no offsets to expire (#8767)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36aa366 KAFKA-10060 GroupMetadataManager should not log if there are no offsets to expire (#8767)
36aa366 is described below
commit 36aa3664343ee8a72b27d4ee6b1ccc8ce323d421
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Mon Jun 1 05:59:40 2020 +0800
KAFKA-10060 GroupMetadataManager should not log if there are no offsets to expire (#8767)
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../coordinator/group/GroupMetadataManager.scala | 3 +-
.../group/GroupMetadataManagerTest.scala | 41 +++++++++++++++++-----
2 files changed, 34 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index aba6a5f..48806e6 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -786,7 +786,8 @@ class GroupMetadataManager(brokerId: Int,
group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
})
offsetExpiredSensor.record(numOffsetsRemoved)
- info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
+ if (numOffsetsRemoved > 0)
+ info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
}
/**
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 6266d7f..6f7360f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -57,7 +57,6 @@ class GroupMetadataManagerTest {
var replicaManager: ReplicaManager = null
var groupMetadataManager: GroupMetadataManager = null
var scheduler: KafkaScheduler = null
- var zkClient: KafkaZkClient = null
var partition: Partition = null
var defaultOffsetRetentionMs = Long.MaxValue
var metrics: kMetrics = null
@@ -71,11 +70,9 @@ class GroupMetadataManagerTest {
val sessionTimeout = 10000
val defaultRequireStable = false
- @Before
- def setUp(): Unit = {
+ private val offsetConfig = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
-
- val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+ OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
@@ -85,22 +82,48 @@ class GroupMetadataManagerTest {
offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+ }
- defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs
-
+ private def mockKafkaZkClient: KafkaZkClient = {
// make two partitions of the group topic to make sure some partitions are not owned by the coordinator
- zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
EasyMock.replay(zkClient)
+ zkClient
+ }
+ @Before
+ def setUp(): Unit = {
+ defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs
metrics = new kMetrics()
time = new MockTime
replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
- groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time, metrics)
+ groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager,
+ mockKafkaZkClient, time, metrics)
partition = EasyMock.niceMock(classOf[Partition])
}
@Test
+ def testLogInfoFromCleanupGroupMetadata(): Unit = {
+ var expiredOffsets: Int = 0
+ var infoCount = 0
+ val gmm = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, mockKafkaZkClient, time, metrics) {
+ override def cleanupGroupMetadata(groups: Iterable[GroupMetadata],
+ selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = expiredOffsets
+
+ override def info(msg: => String): Unit = infoCount += 1
+ }
+
+ // if there are no offsets to expire, we skip to log
+ gmm.cleanupGroupMetadata()
+ assertEquals(0, infoCount)
+ // if there are offsets to expire, we should log info
+ expiredOffsets = 100
+ gmm.cleanupGroupMetadata()
+ assertEquals(1, infoCount)
+ }
+
+ @Test
def testLoadOffsetsWithoutGroup(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val startOffset = 15L