You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/11/01 08:49:58 UTC

[kafka] branch 2.4 updated: MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new b7cab2d  MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric
b7cab2d is described below

commit b7cab2d66f017464d3242525643a2afe2c28e924
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Fri Nov 1 14:18:48 2019 +0530

    MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric
    
    We would mistakenly increment the `OffsetCommits` metric instead
    
    Author: Stanislav Kozlovski <st...@outlook.com>
    
    Reviewers: David Jacot <dj...@confluent.io>, Manikumar Reddy <ma...@gmail.com>
    
    Closes #7624 from stanislavkozlovski/minor-fix-group-coordinator-offset-expiry-metric
    
    (cherry picked from commit 72282ed1988e33000f91672a72c7356e26658241)
    Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
 .../scala/kafka/coordinator/group/GroupCoordinator.scala   | 14 +++++++-------
 .../kafka/coordinator/group/GroupMetadataManager.scala     |  2 +-
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 22f15f9..24a1780 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition]): (Errors, Map[TopicPartition, Errors]) = {
     var groupError: Errors = Errors.NONE
     var partitionErrors: Map[TopicPartition, Errors] = Map()
-    var partitionEligibleForDeletion: Seq[TopicPartition] = Seq()
+    var partitionsEligibleForDeletion: Seq[TopicPartition] = Seq()
 
     validateGroupStatus(groupId, ApiKeys.OFFSET_DELETE) match {
       case Some(error) =>
@@ -565,13 +565,13 @@ class GroupCoordinator(val brokerId: Int,
                     Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR
 
                 case Empty =>
-                  partitionEligibleForDeletion = partitions
+                  partitionsEligibleForDeletion = partitions
 
                 case PreparingRebalance | CompletingRebalance | Stable if group.isConsumerGroup =>
                   val (consumed, notConsumed) =
                     partitions.partition(tp => group.isSubscribedToTopic(tp.topic()))
 
-                  partitionEligibleForDeletion = notConsumed
+                  partitionsEligibleForDeletion = notConsumed
                   partitionErrors = consumed.map(_ -> Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap
 
                 case _ =>
@@ -579,16 +579,16 @@ class GroupCoordinator(val brokerId: Int,
               }
             }
 
-            if (partitionEligibleForDeletion.nonEmpty) {
+            if (partitionsEligibleForDeletion.nonEmpty) {
               val offsetsRemoved = groupManager.cleanupGroupMetadata(Seq(group), group => {
-                group.removeOffsets(partitionEligibleForDeletion)
+                group.removeOffsets(partitionsEligibleForDeletion)
               })
 
-              partitionErrors ++= partitionEligibleForDeletion.map(_ -> Errors.NONE).toMap
+              partitionErrors ++= partitionsEligibleForDeletion.map(_ -> Errors.NONE).toMap
 
               offsetDeletionSensor.record(offsetsRemoved)
 
-              info(s"The following offsets of the group $groupId were deleted: ${partitionEligibleForDeletion.mkString(", ")}. " +
+              info(s"The following offsets of the group $groupId were deleted: ${partitionsEligibleForDeletion.mkString(", ")}. " +
                 s"A total of $offsetsRemoved offsets were removed.")
             }
         }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 2cbf7c8..a143669 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -766,7 +766,7 @@ class GroupMetadataManager(brokerId: Int,
     val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
       group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
     })
-    offsetCommitsSensor.record(numOffsetsRemoved)
+    offsetExpiredSensor.record(numOffsetsRemoved)
     info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
   }