You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hangleton (via GitHub)" <gi...@apache.org> on 2023/04/05 19:23:42 UTC

[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1158907492


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -291,24 +291,24 @@ private[group] class GroupCoordinatorAdapter(
     topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics],
     requireStable: Boolean
   ): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
-    val topicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+    val topicIdPartitions = new mutable.ArrayBuffer[TopicIdPartition]()
     topics.forEach { topic =>
       topic.partitionIndexes.forEach { partition =>
-        topicPartitions += new TopicPartition(topic.name, partition)
+        topicIdPartitions += new TopicIdPartition(Uuid.ZERO_UUID, partition, topic.name)

Review Comment:
   Yes, you are right. The topic id will come from `OffsetFetchRequestTopics`.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Until topic ids become persisted in the group metadata, we won't have the topic ID unless it is resolved using the metadata cache. I shouldn't have used the zero id here, thanks for pointing it out!
   
   If we want to maintain the mapping performed in `getOffsets` which is using `TopicIdPartition` as keys, we could instead perform a resolution of topic id from the topic name as done in the `KafkaApis` request-handler for topic-id-aware requests before they are processed. But then we would need to consider the cases where an id cannot be found. I will think about it a bit more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org