You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/09 22:30:53 UTC
[kafka] branch 1.0 updated: KAFKA-6622;
Fix performance issue loading consumer offsets (#4661)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 3935464 KAFKA-6622; Fix performance issue loading consumer offsets (#4661)
3935464 is described below
commit 3935464202e6898bdc63f1eca005da2b92100c07
Author: Radai Rosenblatt <ra...@gmail.com>
AuthorDate: Fri Mar 9 14:21:06 2018 -0800
KAFKA-6622; Fix performance issue loading consumer offsets (#4661)
`batch.baseOffset` is an expensive operation (even says so in its javadoc), and yet was called for every single record in a batch when loading offsets. This means that for N records in a gzipped batch, the entire batch will be unzipped N times. The fix is to compute and cache the base offset once as we decompress and process the batch.
Reviewers: Dong Lin <li...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/coordinator/group/GroupMetadataManager.scala | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index dbae86f..fae08e2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -509,8 +509,11 @@ class GroupMetadataManager(brokerId: Int,
}
pendingOffsets.remove(batch.producerId)
} else {
+ var batchBaseOffset: Option[Long] = None
for (record <- batch.asScala) {
require(record.hasKey, "Group metadata/offset entry key should not be null")
+ if (batchBaseOffset.isEmpty)
+ batchBaseOffset = Some(record.offset)
GroupMetadataManager.readMessageKey(record.key) match {
case offsetKey: OffsetKey =>
@@ -527,9 +530,9 @@ class GroupMetadataManager(brokerId: Int,
} else {
val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
if (isTxnOffsetCommit)
- pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+ pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
else
- loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+ loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
}
case groupMetadataKey: GroupMetadataKey =>
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.