You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/07 18:50:27 UTC

kafka git commit: MINOR: A few logging improvements in group coordinator

Repository: kafka
Updated Branches:
  refs/heads/trunk 6aeca1012 -> effda15f1


MINOR: A few logging improvements in group coordinator

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3259 from hachikuji/group-coordinator-logging-improvements


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/effda15f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/effda15f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/effda15f

Branch: refs/heads/trunk
Commit: effda15f1fbe643cde1f610b9d54715c9e2078cf
Parents: 6aeca10
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jun 7 11:50:20 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Jun 7 11:50:20 2017 -0700

----------------------------------------------------------------------
 .../coordinator/group/GroupCoordinator.scala    | 15 ++++++-----
 .../group/GroupMetadataManager.scala            | 27 ++++++++------------
 .../transaction/TransactionMetadata.scala       |  1 -
 3 files changed, 19 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/effda15f/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 7c1e002..d9d120c 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -143,7 +143,7 @@ class GroupCoordinator(val brokerId: Int,
                           protocols: List[(String, Array[Byte])],
                           responseCallback: JoinCallback) {
     group synchronized {
-      if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
+      if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
         // if the new member does not support the group protocol, reject it
         responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
@@ -513,7 +513,6 @@ class GroupCoordinator(val brokerId: Int,
     groupManager.cleanupGroupMetadata(Some(topicPartitions))
   }
 
-
   private def validateGroup(groupId: String): Option[Errors] = {
     if (!isActive.get)
       Some(Errors.COORDINATOR_NOT_AVAILABLE)
@@ -685,14 +684,16 @@ class GroupCoordinator(val brokerId: Int,
       new DelayedJoin(this, group, group.rebalanceTimeoutMs)
 
     group.transitionTo(PreparingRebalance)
-    info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
+
+    info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
+      s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
 
     val groupKey = GroupKey(group.groupId)
     joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
   }
 
   private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
-    trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
+    debug(s"Member ${member.memberId} in group ${group.groupId} has failed")
     group.remove(member.memberId)
     group.currentState match {
       case Dead | Empty =>
@@ -724,7 +725,8 @@ class GroupCoordinator(val brokerId: Int,
       if (!group.is(Dead)) {
         group.initNextGeneration()
         if (group.is(Empty)) {
-          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
+          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
+            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
 
           groupManager.storeGroup(group, Map.empty, error => {
             if (error != Errors.NONE) {
@@ -735,7 +737,8 @@ class GroupCoordinator(val brokerId: Int,
             }
           })
         } else {
-          info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
+          info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
+            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
 
           // trigger the awaiting join group response callback for all the members after rebalancing
           for (member <- group.allMemberMetadata) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/effda15f/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index db3d936..a8419fd 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -437,10 +437,10 @@ class GroupMetadataManager(brokerId: Int,
    */
   def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+    info(s"Scheduling loading of offsets and group metadata from $topicPartition")
+    scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
 
     def doLoadGroupsAndOffsets() {
-      info(s"Loading offsets and group metadata from $topicPartition")
-
       inLock(partitionLock) {
         if (loadingPartitions.contains(offsetsPartition)) {
           info(s"Offset load from $topicPartition already in progress.")
@@ -451,7 +451,9 @@ class GroupMetadataManager(brokerId: Int,
       }
 
       try {
+        val startMs = time.milliseconds()
         loadGroupsAndOffsets(topicPartition, onGroupLoaded)
+        info(s"Finished loading offsets and group metadata from $topicPartition in ${time.milliseconds() - startMs} milliseconds.")
       } catch {
         case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
       } finally {
@@ -461,14 +463,11 @@ class GroupMetadataManager(brokerId: Int,
         }
       }
     }
-
-    scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets _)
   }
 
   private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
     def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
 
-    val startMs = time.milliseconds()
     replicaManager.getLog(topicPartition) match {
       case None =>
         warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
@@ -577,7 +576,7 @@ class GroupMetadataManager(brokerId: Int,
           loadedGroups.values.foreach { group =>
             val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
             val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
-            trace(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
+            debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
             loadGroup(group, offsets, pendingOffsets)
             onGroupLoaded(group)
           }
@@ -588,7 +587,7 @@ class GroupMetadataManager(brokerId: Int,
             val group = new GroupMetadata(groupId)
             val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
             val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
-            trace(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
+            debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
             loadGroup(group, offsets, pendingOffsets)
             onGroupLoaded(group)
           }
@@ -601,10 +600,6 @@ class GroupMetadataManager(brokerId: Int,
               throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
                 s"loading partition $topicPartition")
           }
-
-          if (!shuttingDown.get())
-            info("Finished loading offsets from %s in %d milliseconds."
-              .format(topicPartition, time.milliseconds() - startMs))
         }
     }
   }
@@ -641,7 +636,8 @@ class GroupMetadataManager(brokerId: Int,
   def removeGroupsForPartition(offsetsPartition: Int,
                                onGroupUnloaded: GroupMetadata => Unit) {
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
-    scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets _)
+    info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
+    scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
 
     def removeGroupsAndOffsets() {
       var numOffsetsRemoved = 0
@@ -663,11 +659,8 @@ class GroupMetadataManager(brokerId: Int,
         }
       }
 
-      if (numOffsetsRemoved > 0)
-        info(s"Removed $numOffsetsRemoved cached offsets for $topicPartition on follower transition.")
-
-      if (numGroupsRemoved > 0)
-        info(s"Removed $numGroupsRemoved cached groups for $topicPartition on follower transition.")
+      info(s"Finished unloading $topicPartition. Removed $numOffsetsRemoved cached offsets " +
+        s"and $numGroupsRemoved cached groups.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/effda15f/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index a92e6be..cc9a2f7 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -323,7 +323,6 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
           }
 
         case CompleteAbort | CompleteCommit => // from write markers
-          info(s"transit start ${transitMetadata.txnStartTimestamp}")
           if (!validProducerEpoch(transitMetadata) ||
             txnTimeoutMs != transitMetadata.txnTimeoutMs ||
             transitMetadata.txnStartTimestamp == -1) {