You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/09 01:34:12 UTC

kafka git commit: KAFKA-3343; Use NoTimestamp in GroupMetadataManager when message v0 i…

Repository: kafka
Updated Branches:
  refs/heads/trunk 5afa16601 -> c4282371d


KAFKA-3343; Use NoTimestamp in GroupMetadataManager when message v0 i…

…s used.

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #1023 from becketqin/KAFKA-3343


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4282371
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4282371
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4282371

Branch: refs/heads/trunk
Commit: c4282371d954d7ae6decd32252d85f0d2a254e8c
Parents: 5afa166
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue Mar 8 18:34:07 2016 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 8 18:34:07 2016 -0600

----------------------------------------------------------------------
 .../coordinator/GroupMetadataManager.scala      | 24 ++++++++++++--------
 1 file changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4282371/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index cbdb854..2c0236e 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -143,8 +143,9 @@ class GroupMetadataManager(val brokerId: Int,
       // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
       // retry removing this group.
       val groupPartition = partitionFor(group.groupId)
+      val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)
       val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
-        timestamp = time.milliseconds(), magicValue = getMessageFormatVersion(groupPartition))
+        timestamp = timestamp, magicValue = magicValue)
 
       val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
       partitionOpt.foreach { partition =>
@@ -169,12 +170,12 @@ class GroupMetadataManager(val brokerId: Int,
   def prepareStoreGroup(group: GroupMetadata,
                         groupAssignment: Map[String, Array[Byte]],
                         responseCallback: Short => Unit): DelayedStore = {
+    val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
     val message = new Message(
       key = GroupMetadataManager.groupMetadataKey(group.groupId),
       bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment),
-      timestamp = time.milliseconds(),
-      magicValue = getMessageFormatVersion(partitionFor(group.groupId))
-    )
+      timestamp = timestamp,
+      magicValue = magicValue)
 
     val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
 
@@ -253,11 +254,12 @@ class GroupMetadataManager(val brokerId: Int,
 
     // construct the message set to append
     val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+      val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId))
       new Message(
         key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
         bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
-        timestamp = time.milliseconds(),
-        magicValue = getMessageFormatVersion(partitionFor(groupId))
+        timestamp = timestamp,
+        magicValue = magicValue
       )
     }.toSeq
 
@@ -557,8 +559,8 @@ class GroupMetadataManager(val brokerId: Int,
         val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
           groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
 
-        (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = time.milliseconds(),
-          magicValue = getMessageFormatVersion(offsetsPartition)))
+        val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
+        (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue))
       }.groupBy { case (partition, tombstone) => partition }
 
       // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
@@ -627,11 +629,13 @@ class GroupMetadataManager(val brokerId: Int,
       config.offsetsTopicNumPartitions
   }
 
-  private def getMessageFormatVersion(partition: Int): Byte = {
+  private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
     val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition)
-    replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
+    val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
       throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
     }
+    val timestamp = if (messageFormatVersion == Message.MagicValue_V0) Message.NoTimestamp else time.milliseconds()
+    (messageFormatVersion, timestamp)
   }
 
   /**