You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/13 23:10:41 UTC
kafka git commit: KAFKA-2833;
print only group offset / metadata according to the formatter
Repository: kafka
Updated Branches:
refs/heads/trunk a26dbcdf3 -> 43ef0150b
KAFKA-2833; print only group offset / metadata according to the formatter
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #527 from guozhangwang/K2833
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43ef0150
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43ef0150
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43ef0150
Branch: refs/heads/trunk
Commit: 43ef0150bdc4ca349af61bf4c75dfb8fd8d14691
Parents: a26dbcd
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Nov 13 14:10:38 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 13 14:10:38 2015 -0800
----------------------------------------------------------------------
.../scala/kafka/coordinator/GroupMetadata.scala | 4 ++
.../coordinator/GroupMetadataManager.scala | 54 ++++++++++++++++----
.../kafka/coordinator/MemberMetadata.scala | 3 ++
3 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/43ef0150/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index ece9ce0..4fa656e 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -236,4 +236,8 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
.format(groupId, GroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
}
+
+ override def toString = {
+ "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members)
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/43ef0150/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 4ac2c7a..027abf7 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -62,7 +62,7 @@ class GroupMetadataManager(val brokerId: Int,
/* group metadata cache */
private val groupsCache = new Pool[String, GroupMetadata]
- /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */
+ /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE offsetExpireLock and the group lock if needed */
private val loadingPartitions: mutable.Set[Int] = mutable.Set()
/* partitions of consumer groups that are assigned, using the same loading partition lock */
@@ -410,6 +410,14 @@ class GroupMetadataManager(val brokerId: Int,
if (groupMetadata != null) {
trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
updateGroup(groupId, groupMetadata)
+ } else {
+ // this is a tombstone mark, we need to delete the group from cache if it exists
+ val group = groupsCache.remove(groupId)
+ if (group != null) {
+ group synchronized {
+ group.transitionTo(Dead)
+ }
+ }
}
}
@@ -932,12 +940,34 @@ object GroupMetadataManager {
// (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
class OffsetsMessageFormatter extends MessageFormatter {
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
- val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key)).toString
- val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
- output.write(formattedKey.getBytes)
- output.write("::".getBytes)
- output.write(formattedValue.getBytes)
- output.write("\n".getBytes)
+ val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
+
+ // only print if the message is an offset record
+ if (formattedKey.isInstanceOf[OffsetKey]) {
+ val groupTopicPartition = formattedKey.asInstanceOf[OffsetKey].toString
+ val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
+ output.write(groupTopicPartition.getBytes)
+ output.write("::".getBytes)
+ output.write(formattedValue.getBytes)
+ output.write("\n".getBytes)
+ }
+ }
+ }
+
+ // Formatter for use with tools to read group metadata history
+ class GroupMetadataMessageFormatter extends MessageFormatter {
+ def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+ val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
+
+ // only print if the message is a group metadata record
+ if (formattedKey.isInstanceOf[GroupKey]) {
+ val groupId = formattedKey.asInstanceOf[GroupKey].key
+ val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
+ output.write(groupId.getBytes)
+ output.write("::".getBytes)
+ output.write(formattedValue.getBytes)
+ output.write("\n".getBytes)
+ }
}
}
}
@@ -956,7 +986,13 @@ trait BaseKey{
def key: Object
}
-case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey
+case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {
+
+ override def toString = key.toString
+}
+
+case class GroupKey(version: Short, key: String) extends BaseKey {
-case class GroupKey(version: Short, key: String) extends BaseKey
+ override def toString = key
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43ef0150/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 80782c8..1d799f2 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -113,4 +113,7 @@ private[coordinator] class MemberMetadata(val memberId: String,
}
}
+ override def toString = {
+ "[%s,%s,%s,%d]".format(memberId, clientId, clientHost, sessionTimeoutMs)
+ }
}