You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/22 16:19:26 UTC

[kafka] branch trunk updated: MINOR: Refactor GroupMetadataManager cleanupGroupMetadata (#4504)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1cabef0  MINOR: Refactor GroupMetadataManager cleanupGroupMetadata (#4504)
1cabef0 is described below

commit 1cabef0d3dc7a3c245f260b8d34a60d7d044bb9c
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Thu Feb 22 08:19:13 2018 -0800

    MINOR: Refactor GroupMetadataManager cleanupGroupMetadata (#4504)
    
    Refactoring avoids the need to call this method with a infinity as current time to remove all group offsets (when manually deleting the group).
---
 .../kafka/coordinator/group/GroupCoordinator.scala | 11 ++++--
 .../kafka/coordinator/group/GroupMetadata.scala    |  3 +-
 .../coordinator/group/GroupMetadataManager.scala   | 39 ++++++++++++----------
 3 files changed, 32 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 5ae8552..4e605e2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -375,9 +375,11 @@ class GroupCoordinator(val brokerId: Int,
       }
 
       if (eligibleGroups.nonEmpty) {
-        groupManager.cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue)
+        val offsetsRemoved = groupManager.cleanupGroupMetadata(eligibleGroups, group => {
+          group.removeAllOffsets()
+        })
         groupErrors ++= eligibleGroups.map(_.groupId -> Errors.NONE).toMap
-        info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}")
+        info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}. A total of $offsetsRemoved offsets were removed.")
       }
 
       groupErrors
@@ -568,7 +570,10 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) {
-    groupManager.cleanupGroupMetadata(Some(topicPartitions), groupManager.currentGroups, time.milliseconds())
+    val offsetsRemoved = groupManager.cleanupGroupMetadata(groupManager.currentGroups, group => {
+      group.removeOffsets(topicPartitions)
+    })
+    info(s"Removed $offsetsRemoved offsets associated with deleted partitions: ${topicPartitions.mkString(", ")}.")
   }
 
   private def validateGroup(groupId: String): Option[Errors] = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 07d14f4..2b9c91f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -421,9 +421,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   def hasPendingOffsetCommitsFromProducer(producerId: Long) =
     pendingTransactionalOffsetCommits.contains(producerId)
 
+  def removeAllOffsets(): immutable.Map[TopicPartition, OffsetAndMetadata] = removeOffsets(offsets.keySet.toSeq)
+
   def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
     topicPartitions.flatMap { topicPartition =>
-
       pendingOffsetCommits.remove(topicPartition)
       pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
         pendingOffsets.remove(topicPartition)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 3391fc3..3b79544 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -713,22 +713,27 @@ class GroupMetadataManager(brokerId: Int,
 
   // visible for testing
   private[group] def cleanupGroupMetadata(): Unit = {
-    cleanupGroupMetadata(None, groupMetadataCache.values, time.milliseconds())
+    val startMs = time.milliseconds()
+    val offsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
+      group.removeExpiredOffsets(time.milliseconds())
+    })
+    info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
   }
 
-  def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]],
-                           groups: Iterable[GroupMetadata],
-                           startMs: Long) {
+  /**
+    * This function is used to clean up group offsets given the groups and also a function that performs the offset deletion.
+    * @param groups Groups whose metadata are to be cleaned up
+    * @param selector A function that implements deletion of (all or part of) group offsets. This function is called while
+    *                 a group lock is held, therefore there is no need for the caller to also obtain a group lock.
+    * @return The cumulative number of offsets removed
+    */
+  def cleanupGroupMetadata(groups: Iterable[GroupMetadata], selector: GroupMetadata => Map[TopicPartition, OffsetAndMetadata]): Int = {
     var offsetsRemoved = 0
 
     groups.foreach { group =>
       val groupId = group.groupId
       val (removedOffsets, groupIsDead, generation) = group.inLock {
-        val removedOffsets = deletedTopicPartitions match {
-          case Some(topicPartitions) => group.removeOffsets(topicPartitions)
-          case None => group.removeExpiredOffsets(startMs)
-        }
-
+        val removedOffsets = selector(group)
         if (group.is(Empty) && !group.hasOffsets) {
           info(s"Group $groupId transitioned to Dead in generation ${group.generationId}")
           group.transitionTo(Dead)
@@ -736,13 +741,13 @@ class GroupMetadataManager(brokerId: Int,
         (removedOffsets, group.is(Dead), group.generationId)
       }
 
-      val offsetsPartition = partitionFor(groupId)
-      val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
-      getMagic(offsetsPartition) match {
-        case Some(magicValue) =>
-          // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
-          val timestampType = TimestampType.CREATE_TIME
-          val timestamp = time.milliseconds()
+    val offsetsPartition = partitionFor(groupId)
+    val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+    getMagic(offsetsPartition) match {
+      case Some(magicValue) =>
+        // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+        val timestampType = TimestampType.CREATE_TIME
+        val timestamp = time.milliseconds()
 
           replicaManager.nonOfflinePartition(appendPartition).foreach { partition =>
             val tombstones = ListBuffer.empty[SimpleRecord]
@@ -788,7 +793,7 @@ class GroupMetadataManager(brokerId: Int,
       }
     }
 
-    info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
+    offsetsRemoved
   }
 
   def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean) {

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.