You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/11/01 08:49:58 UTC
[kafka] branch 2.4 updated: MINOR: Correctly mark offset expiry in
GroupMetadataManager's OffsetExpired metric
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new b7cab2d MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric
b7cab2d is described below
commit b7cab2d66f017464d3242525643a2afe2c28e924
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Fri Nov 1 14:18:48 2019 +0530
MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric
We would mistakenly increment the `OffsetCommits` metric instead
Author: Stanislav Kozlovski <st...@outlook.com>
Reviewers: David Jacot <dj...@confluent.io>, Manikumar Reddy <ma...@gmail.com>
Closes #7624 from stanislavkozlovski/minor-fix-group-coordinator-offset-expiry-metric
(cherry picked from commit 72282ed1988e33000f91672a72c7356e26658241)
Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
.../scala/kafka/coordinator/group/GroupCoordinator.scala | 14 +++++++-------
.../kafka/coordinator/group/GroupMetadataManager.scala | 2 +-
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 22f15f9..24a1780 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int,
def handleDeleteOffsets(groupId: String, partitions: Seq[TopicPartition]): (Errors, Map[TopicPartition, Errors]) = {
var groupError: Errors = Errors.NONE
var partitionErrors: Map[TopicPartition, Errors] = Map()
- var partitionEligibleForDeletion: Seq[TopicPartition] = Seq()
+ var partitionsEligibleForDeletion: Seq[TopicPartition] = Seq()
validateGroupStatus(groupId, ApiKeys.OFFSET_DELETE) match {
case Some(error) =>
@@ -565,13 +565,13 @@ class GroupCoordinator(val brokerId: Int,
Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR
case Empty =>
- partitionEligibleForDeletion = partitions
+ partitionsEligibleForDeletion = partitions
case PreparingRebalance | CompletingRebalance | Stable if group.isConsumerGroup =>
val (consumed, notConsumed) =
partitions.partition(tp => group.isSubscribedToTopic(tp.topic()))
- partitionEligibleForDeletion = notConsumed
+ partitionsEligibleForDeletion = notConsumed
partitionErrors = consumed.map(_ -> Errors.GROUP_SUBSCRIBED_TO_TOPIC).toMap
case _ =>
@@ -579,16 +579,16 @@ class GroupCoordinator(val brokerId: Int,
}
}
- if (partitionEligibleForDeletion.nonEmpty) {
+ if (partitionsEligibleForDeletion.nonEmpty) {
val offsetsRemoved = groupManager.cleanupGroupMetadata(Seq(group), group => {
- group.removeOffsets(partitionEligibleForDeletion)
+ group.removeOffsets(partitionsEligibleForDeletion)
})
- partitionErrors ++= partitionEligibleForDeletion.map(_ -> Errors.NONE).toMap
+ partitionErrors ++= partitionsEligibleForDeletion.map(_ -> Errors.NONE).toMap
offsetDeletionSensor.record(offsetsRemoved)
- info(s"The following offsets of the group $groupId were deleted: ${partitionEligibleForDeletion.mkString(", ")}. " +
+ info(s"The following offsets of the group $groupId were deleted: ${partitionsEligibleForDeletion.mkString(", ")}. " +
s"A total of $offsetsRemoved offsets were removed.")
}
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 2cbf7c8..a143669 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -766,7 +766,7 @@ class GroupMetadataManager(brokerId: Int,
val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
})
- offsetCommitsSensor.record(numOffsetsRemoved)
+ offsetExpiredSensor.record(numOffsetsRemoved)
info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
}