You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hangleton (via GitHub)" <gi...@apache.org> on 2023/05/26 12:19:40 UTC

[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1206702136


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Hi David and Justine, coming back to this, apologies for the delay. Are you comfortable of using `TopicIdPartition` as suggested above, or just keep `TopicPartition` and do the resolution to `TopicIdPartition` in `KafkaApis`? What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org