You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/03/09 01:34:12 UTC
kafka git commit: KAFKA-3343; Use NoTimestamp in GroupMetadataManager when message v0 i…
Repository: kafka
Updated Branches:
refs/heads/trunk 5afa16601 -> c4282371d
KAFKA-3343; Use NoTimestamp in GroupMetadataManager when message v0 i…
…s used.
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #1023 from becketqin/KAFKA-3343
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4282371
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4282371
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4282371
Branch: refs/heads/trunk
Commit: c4282371d954d7ae6decd32252d85f0d2a254e8c
Parents: 5afa166
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue Mar 8 18:34:07 2016 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 8 18:34:07 2016 -0600
----------------------------------------------------------------------
.../coordinator/GroupMetadataManager.scala | 24 ++++++++++++--------
1 file changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4282371/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index cbdb854..2c0236e 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -143,8 +143,9 @@ class GroupMetadataManager(val brokerId: Int,
// if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
// retry removing this group.
val groupPartition = partitionFor(group.groupId)
+ val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)
val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
- timestamp = time.milliseconds(), magicValue = getMessageFormatVersion(groupPartition))
+ timestamp = timestamp, magicValue = magicValue)
val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
partitionOpt.foreach { partition =>
@@ -169,12 +170,12 @@ class GroupMetadataManager(val brokerId: Int,
def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Short => Unit): DelayedStore = {
+ val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
val message = new Message(
key = GroupMetadataManager.groupMetadataKey(group.groupId),
bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment),
- timestamp = time.milliseconds(),
- magicValue = getMessageFormatVersion(partitionFor(group.groupId))
- )
+ timestamp = timestamp,
+ magicValue = magicValue)
val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
@@ -253,11 +254,12 @@ class GroupMetadataManager(val brokerId: Int,
// construct the message set to append
val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+ val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId))
new Message(
key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
- timestamp = time.milliseconds(),
- magicValue = getMessageFormatVersion(partitionFor(groupId))
+ timestamp = timestamp,
+ magicValue = magicValue
)
}.toSeq
@@ -557,8 +559,8 @@ class GroupMetadataManager(val brokerId: Int,
val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
- (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = time.milliseconds(),
- magicValue = getMessageFormatVersion(offsetsPartition)))
+ val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
+ (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue))
}.groupBy { case (partition, tombstone) => partition }
// Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
@@ -627,11 +629,13 @@ class GroupMetadataManager(val brokerId: Int,
config.offsetsTopicNumPartitions
}
- private def getMessageFormatVersion(partition: Int): Byte = {
+ private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = {
val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition)
- replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
+ val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse {
throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found")
}
+ val timestamp = if (messageFormatVersion == Message.MagicValue_V0) Message.NoTimestamp else time.milliseconds()
+ (messageFormatVersion, timestamp)
}
/**