You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/22 16:19:26 UTC
[kafka] branch trunk updated: MINOR: Refactor GroupMetadataManager
cleanupGroupMetadata (#4504)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1cabef0 MINOR: Refactor GroupMetadataManager cleanupGroupMetadata (#4504)
1cabef0 is described below
commit 1cabef0d3dc7a3c245f260b8d34a60d7d044bb9c
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Thu Feb 22 08:19:13 2018 -0800
MINOR: Refactor GroupMetadataManager cleanupGroupMetadata (#4504)
Refactoring avoids the need to call this method with a infinity as current time to remove all group offsets (when manually deleting the group).
---
.../kafka/coordinator/group/GroupCoordinator.scala | 11 ++++--
.../kafka/coordinator/group/GroupMetadata.scala | 3 +-
.../coordinator/group/GroupMetadataManager.scala | 39 ++++++++++++----------
3 files changed, 32 insertions(+), 21 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 5ae8552..4e605e2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -375,9 +375,11 @@ class GroupCoordinator(val brokerId: Int,
}
if (eligibleGroups.nonEmpty) {
- groupManager.cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue)
+ val offsetsRemoved = groupManager.cleanupGroupMetadata(eligibleGroups, group => {
+ group.removeAllOffsets()
+ })
groupErrors ++= eligibleGroups.map(_.groupId -> Errors.NONE).toMap
- info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}")
+ info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}. A total of $offsetsRemoved offsets were removed.")
}
groupErrors
@@ -568,7 +570,10 @@ class GroupCoordinator(val brokerId: Int,
}
def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) {
- groupManager.cleanupGroupMetadata(Some(topicPartitions), groupManager.currentGroups, time.milliseconds())
+ val offsetsRemoved = groupManager.cleanupGroupMetadata(groupManager.currentGroups, group => {
+ group.removeOffsets(topicPartitions)
+ })
+ info(s"Removed $offsetsRemoved offsets associated with deleted partitions: ${topicPartitions.mkString(", ")}.")
}
private def validateGroup(groupId: String): Option[Errors] = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 07d14f4..2b9c91f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -421,9 +421,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def hasPendingOffsetCommitsFromProducer(producerId: Long) =
pendingTransactionalOffsetCommits.contains(producerId)
+ def removeAllOffsets(): immutable.Map[TopicPartition, OffsetAndMetadata] = removeOffsets(offsets.keySet.toSeq)
+
def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
topicPartitions.flatMap { topicPartition =>
-
pendingOffsetCommits.remove(topicPartition)
pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
pendingOffsets.remove(topicPartition)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 3391fc3..3b79544 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -713,22 +713,27 @@ class GroupMetadataManager(brokerId: Int,
// visible for testing
private[group] def cleanupGroupMetadata(): Unit = {
- cleanupGroupMetadata(None, groupMetadataCache.values, time.milliseconds())
+ val startMs = time.milliseconds()
+ val offsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
+ group.removeExpiredOffsets(time.milliseconds())
+ })
+ info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
}
- def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]],
- groups: Iterable[GroupMetadata],
- startMs: Long) {
+ /**
+ * This function is used to clean up group offsets given the groups and also a function that performs the offset deletion.
+ * @param groups Groups whose metadata are to be cleaned up
+ * @param selector A function that implements deletion of (all or part of) group offsets. This function is called while
+ * a group lock is held, therefore there is no need for the caller to also obtain a group lock.
+ * @return The cumulative number of offsets removed
+ */
+ def cleanupGroupMetadata(groups: Iterable[GroupMetadata], selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = {
var offsetsRemoved = 0
groups.foreach { group =>
val groupId = group.groupId
val (removedOffsets, groupIsDead, generation) = group.inLock {
- val removedOffsets = deletedTopicPartitions match {
- case Some(topicPartitions) => group.removeOffsets(topicPartitions)
- case None => group.removeExpiredOffsets(startMs)
- }
-
+ val removedOffsets = selector(group)
if (group.is(Empty) && !group.hasOffsets) {
info(s"Group $groupId transitioned to Dead in generation ${group.generationId}")
group.transitionTo(Dead)
@@ -736,13 +741,13 @@ class GroupMetadataManager(brokerId: Int,
(removedOffsets, group.is(Dead), group.generationId)
}
- val offsetsPartition = partitionFor(groupId)
- val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
- getMagic(offsetsPartition) match {
- case Some(magicValue) =>
- // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
- val timestampType = TimestampType.CREATE_TIME
- val timestamp = time.milliseconds()
+ val offsetsPartition = partitionFor(groupId)
+ val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+ getMagic(offsetsPartition) match {
+ case Some(magicValue) =>
+ // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+ val timestampType = TimestampType.CREATE_TIME
+ val timestamp = time.milliseconds()
replicaManager.nonOfflinePartition(appendPartition).foreach { partition =>
val tombstones = ListBuffer.empty[SimpleRecord]
@@ -788,7 +793,7 @@ class GroupMetadataManager(brokerId: Int,
}
}
- info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
+ offsetsRemoved
}
def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean) {
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.