You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Lucas Bradstreet (Jira)" <ji...@apache.org> on 2020/02/06 01:21:00 UTC
[jira] [Created] (KAFKA-9513) Failed GroupMetadataManager
loadGroupAndOffsets will consider groups as loaded
Lucas Bradstreet created KAFKA-9513:
---------------------------------------
Summary: Failed GroupMetadataManager loadGroupAndOffsets will consider groups as loaded
Key: KAFKA-9513
URL: https://issues.apache.org/jira/browse/KAFKA-9513
Project: Kafka
Issue Type: Improvement
Reporter: Lucas Bradstreet
Bugs in group loading such as https://issues.apache.org/jira/browse/KAFKA-8896 may cause errors loading offsets. loadGroupsAndOffsets's finally block adds the offsets partition to ownedPartitions and removes it from loadingPartitions even if this process does not succeed.
{code:java}
private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = {
try {
val startMs = time.milliseconds()
doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
val endMs = time.milliseconds()
val timeLapse = endMs - startMs
partitionLoadSensor.record(timeLapse, endMs, false)
info(s"Finished loading offsets and group metadata from $topicPartition in $timeLapse milliseconds.")
} catch {
case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
} finally {
inLock(partitionLock) {
ownedPartitions.add(topicPartition.partition)
loadingPartitions.remove(topicPartition.partition)
}
}
}
{code}
This means that the group is considered loaded by:
{code:java}
def isGroupLoading(groupId: String): Boolean = isPartitionLoading(partitionFor(groupId))
{code}
Which may result in consumers being able to load the wrong offsets.
We should consider whether we should be more defensive and instead mark the partition as failed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)