You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Lucas Bradstreet (Jira)" <ji...@apache.org> on 2020/02/06 01:21:00 UTC

[jira] [Created] (KAFKA-9513) Failed GroupMetadataManager loadGroupAndOffsets will consider groups as loaded

Lucas Bradstreet created KAFKA-9513:
---------------------------------------

             Summary: Failed GroupMetadataManager loadGroupAndOffsets will consider groups as loaded
                 Key: KAFKA-9513
                 URL: https://issues.apache.org/jira/browse/KAFKA-9513
             Project: Kafka
          Issue Type: Improvement
            Reporter: Lucas Bradstreet


 

Bugs in group loading such as https://issues.apache.org/jira/browse/KAFKA-8896 may cause errors loading offsets. loadGroupsAndOffsets's finally block adds the offsets partition to ownedPartitions and removes it from loadingPartitions even if this process does not succeed.
{code:java}
private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = {
  try {
    val startMs = time.milliseconds()
    doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
    val endMs = time.milliseconds()
    val timeLapse = endMs - startMs
    partitionLoadSensor.record(timeLapse, endMs, false)
    info(s"Finished loading offsets and group metadata from $topicPartition in $timeLapse milliseconds.")
  } catch {
    case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
  } finally {
    inLock(partitionLock) {
      ownedPartitions.add(topicPartition.partition)
      loadingPartitions.remove(topicPartition.partition)
    }
  }
}
{code}
This means that the group is considered loaded by:
{code:java}
def isGroupLoading(groupId: String): Boolean = isPartitionLoading(partitionFor(groupId))
{code}
 

Which may result in consumers being able to load the wrong offsets.

We should consider whether we should be more defensive and instead mark the partition as failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)