You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/09 22:30:53 UTC

[kafka] branch 1.0 updated: KAFKA-6622; Fix performance issue loading consumer offsets (#4661)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 3935464  KAFKA-6622; Fix performance issue loading consumer offsets (#4661)
3935464 is described below

commit 3935464202e6898bdc63f1eca005da2b92100c07
Author: Radai Rosenblatt <ra...@gmail.com>
AuthorDate: Fri Mar 9 14:21:06 2018 -0800

    KAFKA-6622; Fix performance issue loading consumer offsets (#4661)
    
    `batch.baseOffset` is an expensive operation (even says so in its javadoc), and yet was called for every single record in a batch when loading offsets. This means that for N records in a gzipped batch, the entire batch will be unzipped N times. The fix is to compute and cache the base offset once as we decompress and process the batch.
    
    Reviewers: Dong Lin <li...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/coordinator/group/GroupMetadataManager.scala  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index dbae86f..fae08e2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -509,8 +509,11 @@ class GroupMetadataManager(brokerId: Int,
               }
               pendingOffsets.remove(batch.producerId)
             } else {
+              var batchBaseOffset: Option[Long] = None
               for (record <- batch.asScala) {
                 require(record.hasKey, "Group metadata/offset entry key should not be null")
+                if (batchBaseOffset.isEmpty)
+                  batchBaseOffset = Some(record.offset)
                 GroupMetadataManager.readMessageKey(record.key) match {
 
                   case offsetKey: OffsetKey =>
@@ -527,9 +530,9 @@ class GroupMetadataManager(brokerId: Int,
                     } else {
                       val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
                       if (isTxnOffsetCommit)
-                        pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+                        pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
                       else
-                        loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+                        loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
                     }
 
                   case groupMetadataKey: GroupMetadataKey =>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.