You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/05/31 22:00:10 UTC

[kafka] branch trunk updated: KAFKA-10060 GroupMetadataManager should not log if there are no offsets to expire (#8767)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36aa366  KAFKA-10060 GroupMetadataManager should not log if there are no offsets to expire (#8767)
36aa366 is described below

commit 36aa3664343ee8a72b27d4ee6b1ccc8ce323d421
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Mon Jun 1 05:59:40 2020 +0800

    KAFKA-10060 GroupMetadataManager should not log if there are no offsets to expire (#8767)
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../coordinator/group/GroupMetadataManager.scala   |  3 +-
 .../group/GroupMetadataManagerTest.scala           | 41 +++++++++++++++++-----
 2 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index aba6a5f..48806e6 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -786,7 +786,8 @@ class GroupMetadataManager(brokerId: Int,
       group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
     })
     offsetExpiredSensor.record(numOffsetsRemoved)
-    info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
+    if (numOffsetsRemoved > 0)
+      info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 6266d7f..6f7360f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -57,7 +57,6 @@ class GroupMetadataManagerTest {
   var replicaManager: ReplicaManager = null
   var groupMetadataManager: GroupMetadataManager = null
   var scheduler: KafkaScheduler = null
-  var zkClient: KafkaZkClient = null
   var partition: Partition = null
   var defaultOffsetRetentionMs = Long.MaxValue
   var metrics: kMetrics = null
@@ -71,11 +70,9 @@ class GroupMetadataManagerTest {
   val sessionTimeout = 10000
   val defaultRequireStable = false
 
-  @Before
-  def setUp(): Unit = {
+  private val offsetConfig = {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
-
-    val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+    OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
       offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
@@ -85,22 +82,48 @@ class GroupMetadataManagerTest {
       offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
       offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+  }
 
-    defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs
-
+  private def mockKafkaZkClient: KafkaZkClient = {
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
-    zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
     EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
     EasyMock.replay(zkClient)
+    zkClient
+  }
 
+  @Before
+  def setUp(): Unit = {
+    defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs
     metrics = new kMetrics()
     time = new MockTime
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
-    groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time, metrics)
+    groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager,
+      mockKafkaZkClient, time, metrics)
     partition = EasyMock.niceMock(classOf[Partition])
   }
 
   @Test
+  def testLogInfoFromCleanupGroupMetadata(): Unit = {
+    var expiredOffsets: Int = 0
+    var infoCount = 0
+    val gmm = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, mockKafkaZkClient, time, metrics) {
+      override def cleanupGroupMetadata(groups: Iterable[GroupMetadata],
+                                        selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = expiredOffsets
+
+      override def info(msg: => String): Unit = infoCount += 1
+    }
+
+    // if there are no offsets to expire, we skip to log
+    gmm.cleanupGroupMetadata()
+    assertEquals(0, infoCount)
+    // if there are offsets to expire, we should log info
+    expiredOffsets = 100
+    gmm.cleanupGroupMetadata()
+    assertEquals(1, infoCount)
+  }
+
+  @Test
   def testLoadOffsetsWithoutGroup(): Unit = {
     val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L