You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/13 23:10:41 UTC

kafka git commit: KAFKA-2833; print only group offset / metadata according to the formatter

Repository: kafka
Updated Branches:
  refs/heads/trunk a26dbcdf3 -> 43ef0150b


KAFKA-2833; print only group offset / metadata according to the formatter

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #527 from guozhangwang/K2833


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43ef0150
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43ef0150
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43ef0150

Branch: refs/heads/trunk
Commit: 43ef0150bdc4ca349af61bf4c75dfb8fd8d14691
Parents: a26dbcd
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Nov 13 14:10:38 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 13 14:10:38 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/coordinator/GroupMetadata.scala |  4 ++
 .../coordinator/GroupMetadataManager.scala      | 54 ++++++++++++++++----
 .../kafka/coordinator/MemberMetadata.scala      |  3 ++
 3 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/43ef0150/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index ece9ce0..4fa656e 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -236,4 +236,8 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
       throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
         .format(groupId, GroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
   }
+
+  override def toString = {
+    "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/43ef0150/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 4ac2c7a..027abf7 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -62,7 +62,7 @@ class GroupMetadataManager(val brokerId: Int,
   /* group metadata cache */
   private val groupsCache = new Pool[String, GroupMetadata]
 
-  /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */
+  /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE offsetExpireLock and the group lock if needed */
   private val loadingPartitions: mutable.Set[Int] = mutable.Set()
 
   /* partitions of consumer groups that are assigned, using the same loading partition lock */
@@ -410,6 +410,14 @@ class GroupMetadataManager(val brokerId: Int,
                     if (groupMetadata != null) {
                       trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
                       updateGroup(groupId, groupMetadata)
+                    } else {
+                      // this is a tombstone mark, we need to delete the group from cache if it exists
+                      val group = groupsCache.remove(groupId)
+                      if (group != null) {
+                        group synchronized {
+                          group.transitionTo(Dead)
+                        }
+                      }
                     }
                   }
 
@@ -932,12 +940,34 @@ object GroupMetadataManager {
   // (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
   class OffsetsMessageFormatter extends MessageFormatter {
     def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
-      val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key)).toString
-      val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
-      output.write(formattedKey.getBytes)
-      output.write("::".getBytes)
-      output.write(formattedValue.getBytes)
-      output.write("\n".getBytes)
+      val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
+
+      // only print if the message is an offset record
+      if (formattedKey.isInstanceOf[OffsetKey]) {
+        val groupTopicPartition = formattedKey.asInstanceOf[OffsetKey].toString
+        val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
+        output.write(groupTopicPartition.getBytes)
+        output.write("::".getBytes)
+        output.write(formattedValue.getBytes)
+        output.write("\n".getBytes)
+      }
+    }
+  }
+
+  // Formatter for use with tools to read group metadata history
+  class GroupMetadataMessageFormatter extends MessageFormatter {
+    def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+      val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
+
+      // only print if the message is a group metadata record
+      if (formattedKey.isInstanceOf[GroupKey]) {
+        val groupId = formattedKey.asInstanceOf[GroupKey].key
+        val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
+        output.write(groupId.getBytes)
+        output.write("::".getBytes)
+        output.write(formattedValue.getBytes)
+        output.write("\n".getBytes)
+      }
     }
   }
 }
@@ -956,7 +986,13 @@ trait BaseKey{
   def key: Object
 }
 
-case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey
+case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {
+
+  override def toString = key.toString
+}
+
+case class GroupKey(version: Short, key: String) extends BaseKey {
 
-case class GroupKey(version: Short, key: String) extends BaseKey
+  override def toString = key
+}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43ef0150/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 80782c8..1d799f2 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -113,4 +113,7 @@ private[coordinator] class MemberMetadata(val memberId: String,
     }
   }
 
+  override def toString = {
+    "[%s,%s,%s,%d]".format(memberId, clientId, clientHost, sessionTimeoutMs)
+  }
 }