You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/07 18:50:27 UTC
kafka git commit: MINOR: A few logging improvements in group
coordinator
Repository: kafka
Updated Branches:
refs/heads/trunk 6aeca1012 -> effda15f1
MINOR: A few logging improvements in group coordinator
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3259 from hachikuji/group-coordinator-logging-improvements
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/effda15f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/effda15f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/effda15f
Branch: refs/heads/trunk
Commit: effda15f1fbe643cde1f610b9d54715c9e2078cf
Parents: 6aeca10
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jun 7 11:50:20 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Jun 7 11:50:20 2017 -0700
----------------------------------------------------------------------
.../coordinator/group/GroupCoordinator.scala | 15 ++++++-----
.../group/GroupMetadataManager.scala | 27 ++++++++------------
.../transaction/TransactionMetadata.scala | 1 -
3 files changed, 19 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/effda15f/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 7c1e002..d9d120c 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -143,7 +143,7 @@ class GroupCoordinator(val brokerId: Int,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
group synchronized {
- if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
+ if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
// if the new member does not support the group protocol, reject it
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
@@ -513,7 +513,6 @@ class GroupCoordinator(val brokerId: Int,
groupManager.cleanupGroupMetadata(Some(topicPartitions))
}
-
private def validateGroup(groupId: String): Option[Errors] = {
if (!isActive.get)
Some(Errors.COORDINATOR_NOT_AVAILABLE)
@@ -685,14 +684,16 @@ class GroupCoordinator(val brokerId: Int,
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
group.transitionTo(PreparingRebalance)
- info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
+
+ info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
+ s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
- trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
+ debug(s"Member ${member.memberId} in group ${group.groupId} has failed")
group.remove(member.memberId)
group.currentState match {
case Dead | Empty =>
@@ -724,7 +725,8 @@ class GroupCoordinator(val brokerId: Int,
if (!group.is(Dead)) {
group.initNextGeneration()
if (group.is(Empty)) {
- info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
+ info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
+ s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
groupManager.storeGroup(group, Map.empty, error => {
if (error != Errors.NONE) {
@@ -735,7 +737,8 @@ class GroupCoordinator(val brokerId: Int,
}
})
} else {
- info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
+ info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
+ s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
// trigger the awaiting join group response callback for all the members after rebalancing
for (member <- group.allMemberMetadata) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/effda15f/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index db3d936..a8419fd 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -437,10 +437,10 @@ class GroupMetadataManager(brokerId: Int,
*/
def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+ info(s"Scheduling loading of offsets and group metadata from $topicPartition")
+ scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
def doLoadGroupsAndOffsets() {
- info(s"Loading offsets and group metadata from $topicPartition")
-
inLock(partitionLock) {
if (loadingPartitions.contains(offsetsPartition)) {
info(s"Offset load from $topicPartition already in progress.")
@@ -451,7 +451,9 @@ class GroupMetadataManager(brokerId: Int,
}
try {
+ val startMs = time.milliseconds()
loadGroupsAndOffsets(topicPartition, onGroupLoaded)
+ info(s"Finished loading offsets and group metadata from $topicPartition in ${time.milliseconds() - startMs} milliseconds.")
} catch {
case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
} finally {
@@ -461,14 +463,11 @@ class GroupMetadataManager(brokerId: Int,
}
}
}
-
- scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets _)
}
private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
- val startMs = time.milliseconds()
replicaManager.getLog(topicPartition) match {
case None =>
warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
@@ -577,7 +576,7 @@ class GroupMetadataManager(brokerId: Int,
loadedGroups.values.foreach { group =>
val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
- trace(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
+ debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
loadGroup(group, offsets, pendingOffsets)
onGroupLoaded(group)
}
@@ -588,7 +587,7 @@ class GroupMetadataManager(brokerId: Int,
val group = new GroupMetadata(groupId)
val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
- trace(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
+ debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
loadGroup(group, offsets, pendingOffsets)
onGroupLoaded(group)
}
@@ -601,10 +600,6 @@ class GroupMetadataManager(brokerId: Int,
throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
s"loading partition $topicPartition")
}
-
- if (!shuttingDown.get())
- info("Finished loading offsets from %s in %d milliseconds."
- .format(topicPartition, time.milliseconds() - startMs))
}
}
}
@@ -641,7 +636,8 @@ class GroupMetadataManager(brokerId: Int,
def removeGroupsForPartition(offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit) {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
- scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets _)
+ info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
+ scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
def removeGroupsAndOffsets() {
var numOffsetsRemoved = 0
@@ -663,11 +659,8 @@ class GroupMetadataManager(brokerId: Int,
}
}
- if (numOffsetsRemoved > 0)
- info(s"Removed $numOffsetsRemoved cached offsets for $topicPartition on follower transition.")
-
- if (numGroupsRemoved > 0)
- info(s"Removed $numGroupsRemoved cached groups for $topicPartition on follower transition.")
+ info(s"Finished unloading $topicPartition. Removed $numOffsetsRemoved cached offsets " +
+ s"and $numGroupsRemoved cached groups.")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/effda15f/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index a92e6be..cc9a2f7 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -323,7 +323,6 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
}
case CompleteAbort | CompleteCommit => // from write markers
- info(s"transit start ${transitMetadata.txnStartTimestamp}")
if (!validProducerEpoch(transitMetadata) ||
txnTimeoutMs != transitMetadata.txnTimeoutMs ||
transitMetadata.txnStartTimestamp == -1) {