You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hangleton (via GitHub)" <gi...@apache.org> on 2023/04/03 12:30:36 UTC

[GitHub] [kafka] Hangleton opened a new pull request, #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Hangleton opened a new pull request, #13493:
URL: https://github.com/apache/kafka/pull/13493

   This task is the sibling of [PR-13378](https://github.com/apache/kafka/pull/13378) which propagates topic ids in the group coordinator on the offset commit (write) path. The purpose of this PR is to change the interfaces of the group coordinator and its adapter to propagate topic ids in a similar way on the offset fetch path.
   
   [KAFKA-14691](https://issues.apache.org/jira/browse/KAFKA-14691) will add the topic ids to the OffsetFetch API itself so that topic ids are propagated from clients to the coordinator on the offset fetch path. 
   
   Changes to the persisted data model (group metadata and keys) are out of scope.
   There is no functional change in this PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1165606409


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine (@jolshan), I pushed a commit with the change to resolve the topic id in the group metadata manager when the offsets of all topic-partitions known by the coordinator are requested. Apologies for the delay.
   
   I don't really like the code I wrote since it introduces one level of indirection on the valid path and one more failure path when a topic id cannot be resolved. And the implementation proposed is not very elegant. But it follows up on the discussion above. 
   
   When the new schemas for metadata and offset records are used, the topic ids will be taken from these records tags. However for all current records, there will still be a need for reconciliation. Perhaps this reconciliation should be done when the offsets are loaded by the coordinator once the in-memory metadata data structure become topic-id-aware.
   
   I will add a few more unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1170980408


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   It could if the IBP is kept on an old version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with you that using the zero id does not feel right. It can be considered as an abusive use of the `TopicIdPartition` as an invalid reference to a resource in the cluster. In the worst case, an invalid `TopicIdPartition` could end up being used somewhere else which treats all `TopicIdPartition` as if they were valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of the `TopicIdPartition` are invalid (that is, references the zero id). This defeats the purpose of returning a map of offsets keyed by topic-id-partition. So, I agree with you that using topic-id-partition should not be exposed in the value returned by `getOffsets` even though there is an asymmetry in the method prototype as a result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1206732981


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   I don't recall the details now. I have to get back to it...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with you that using the zero id does not feel right. It can be considered as an abusive use of the `TopicIdPartition` as an invalid reference to a resource in the cluster. In the worst case, an invalid `TopicIdPartition` could end up being used somewhere else which treats all `TopicIdPartition` as if they were valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of the `TopicIdPartition` are invalid (that is, references the zero id). This defeats the purpose of returning a value based on topic-id-partition. So, I agree with you that using topic-id-partition should not be exposed in the value returned by `getOffsets` even though there is an asymmetry in the method prototype as a result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1158817715


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -291,24 +291,24 @@ private[group] class GroupCoordinatorAdapter(
     topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics],
     requireStable: Boolean
   ): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
-    val topicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+    val topicIdPartitions = new mutable.ArrayBuffer[TopicIdPartition]()
     topics.forEach { topic =>
       topic.partitionIndexes.forEach { partition =>
-        topicPartitions += new TopicPartition(topic.name, partition)
+        topicIdPartitions += new TopicIdPartition(Uuid.ZERO_UUID, partition, topic.name)

Review Comment:
   I assume this will be replaced with topic.topicId?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] github-actions[bot] commented on pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13493:
URL: https://github.com/apache/kafka/pull/13493#issuecomment-1696716259

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1210141583


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Sure, Justine. Will get back with a diagram.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1165606409


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine (@jolshan), I pushed a commit with the change to resolve the topic id in the group metadata manager when the offsets of all topic-partitions known by the coordinator are requested. Apologies for the delay.
   
   I don't really like the code I wrote since it introduces one level of indirection on the valid path and one more failure path when a topic id cannot be resolved. And the implementation proposed is not very elegant. But it follows up on the discussion above. 
   
   When the new schemas for metadata and offset records are used, the topic id will be taken from the record tag. However for all existing records, there will still be a need for reconciliation. Perhaps this reconciliation should be done when the offsets are loaded by the coordinator once the metadata data structure become topic-id-aware.
   
   I will add a few more unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with you that using the zero id does not feel right. It can be considered as an abusive use of the `TopicIdPartition` as a reference to a resource in the cluster. In the worst case, an invalid `TopicIdPartition` could end up being used somewhere else which treats all `TopicIdPartition` as if they were valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of the `TopicIdPartition` are invalid (that is, references the zero id). This defeats the purpose of returning a reference based on topic-id-partition. So, I agree with you that using topic-id-partition should not be exposed in the value returned by `getOffsets` even though there is an asymmetry in the method prototype as a result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1206702136


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Hi David and Justine, coming back to this, apologies for the delay. Are you comfortable of using `TopicIdPartition` as suggested above, or just keep `TopicPartition` and do the resolution to `TopicIdPartition` in `KafkaApis`? What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1206991344


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   did we decide that topicIDPartition when the request is old would just have a 0 id? Could you give a brief outline for old vs new request versions and how they are handled (ie representation in memory when handling + what we return in the response for happy path and error cases)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1170250900


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Were we expecting to use this code path when the IBP is less than 2.8? I guess I assumed that the IBP would be higher.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1165606409


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine (@jolshan), I pushed a commit with the change to resolve the topic id in the group metadata manager when the offsets of all topic-partitions known by the coordinator are requested. I don't really like the code I wrote since it introduces one level of indirection on the valid path and one more failure path when a topic id cannot be resolved. And the implementation proposed is not very elegant. But it follows up on the discussion above. 
   
   When the new schemas for metadata and offset records are used, the topic id will be taken from the record tag. However for all existing records, there will still be a need for reconciliation. Perhaps this reconciliation should be done when the offsets are loaded by the coordinator once the metadata data structure become topic-id-aware.
   
   I will add a few more unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1162869801


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine, thanks for the reply. Apologies I was OOO. I was referring to exceptional errors, although not functionally expected.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine, thanks for the reply. Apologies I was OOO. I was referring to exceptional errors, not functionally expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1169911272


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   I have been thinking about this part of the code and I don't think that we can treat it like this.
   
   The first thing to note is that we hit this path when all partitions are requested. In this case, it feels weird to return a partition with a zero topic id and `OffsetFetchResponse.UNKNOWN_PARTITION`. I think that we should just ignore unknown partitions in this case. Note that partitions are deleted from the cache when the topic is deleted.
   
   The second thing to note is that we can't ignore partitions with unknown topic ids all the time because the broker may be on an IBP which does not support topic ids. I think that we have to reason about it based on the version of the request.
   
   The third thing, and this one is for the other patch, is that we should consider returning an UNSUPPORTED_VERSION if the last version of the API is used but the broker does not support topic ids, especially in the case where all offsets are requested.
   
   All in all, I wonder if we should just return all offsets with zero topic ids here and handle all of that in KafkaApis. @Hangleton What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1206778975


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Sure that's fine! I will go ahead and start with the approach discussed last time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1165606409


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine (@jolshan), I pushed a commit with the change to resolve the topic id in the group metadata manager when the offsets of all topic-partitions known by the coordinator are requested. Apologies for the delay.
   
   I don't really like the code I wrote since it introduces one level of indirection on the valid path and one more failure path when a topic id cannot be resolved. And the implementation proposed is not very elegant. But it follows up on the discussion above. 
   
   When the new schemas for metadata and offset records are used, the topic ids will be taken from these records tags. However for all current records, there will still be a need for reconciliation. Perhaps this reconciliation should be done when the offsets are loaded by the coordinator once the in-memory metadata data structure becomes topic-id-aware.
   
   I will add a few more unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1158813853


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -291,24 +291,24 @@ private[group] class GroupCoordinatorAdapter(
     topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics],
     requireStable: Boolean
   ): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
-    val topicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+    val topicIdPartitions = new mutable.ArrayBuffer[TopicIdPartition]()
     topics.forEach { topic =>
       topic.partitionIndexes.forEach { partition =>
-        topicPartitions += new TopicPartition(topic.name, partition)
+        topicIdPartitions += new TopicIdPartition(Uuid.ZERO_UUID, partition, topic.name)

Review Comment:
   Is this an intermediate step before we add the actual topic IDs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with you that using the zero id does not feel right. It can be considered as an abusive use of the `TopicIdPartition` as an invalid reference to a resource in the cluster. In the worst case, an invalid `TopicIdPartition` could end up being used somewhere else which treats all `TopicIdPartition` as if they were valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of the `TopicIdPartition` are invalid (that is, references the zero id). This defeats the purpose of returning a reference based on topic-id-partition. So, I agree with you that using topic-id-partition should not be exposed in the value returned by `getOffsets` even though there is an asymmetry in the method prototype as a result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac closed pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac closed pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch
URL: https://github.com/apache/kafka/pull/13493


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1171099569


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION
+                }
+              }.toMap

Review Comment:
   Hi David, thanks for taking the time to lay out these thoughts. I agree with you that using the zero id does not feel right. It can be considered as an abusive use of the `TopicIdPartition` as an invalid reference to a resource in the cluster. In the worst case, an invalid `TopicIdPartition` could end up being used somewhere else which treats all `TopicIdPartition` as if they were valid/resolved. So, there is a clear code smell here.
   
   In any case, the caller of the method `getOffsets` should know that some of the `TopicIdPartition` are invalid (that is, references the zero id). This defeats the purpose of returning a map of offsets keyed by topic-id-partition. So, I agree with you that using topic-id-partition should not be exposed in the value returned by `getOffsets` even though there will be an asymmetry in the method prototype as a result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1165606409


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine (@jolshan), I pushed a commit with the change to resolve the topic id in the group metadata manager when the offsets of all topic-partitions known by the coordinator are requested. Apologies for the delay.
   
   I don't really like the code I wrote since it introduces one level of indirection on the valid path and one more failure path when a topic id cannot be resolved. And the implementation proposed is not very elegant. But it follows up on the discussion above. 
   
   When the new schemas for metadata and offset records are used, the topic ids will be taken from these records tags. However for all current records, there will still be a need for reconciliation. Perhaps this reconciliation should be done when the offsets are loaded by the coordinator once the metadata data structure become topic-id-aware.
   
   I will add a few more unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1165524375


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -492,42 +492,59 @@ class GroupMetadataManager(brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
-    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+  def getOffsets(groupId: String, requireStable: Boolean, topicIdPartitionsOpt: Option[Seq[TopicIdPartition]]): Map[TopicIdPartition, PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicIdPartitionsOpt.getOrElse("all partitions"), groupId))
     val group = groupMetadataCache.get(groupId)
     if (group == null) {
-      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+      topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
         val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
           Optional.empty(), "", Errors.NONE)
-        topicPartition -> partitionData
+        topicIdPartition -> partitionData
       }.toMap
     } else {
       group.inLock {
         if (group.is(Dead)) {
-          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+          topicIdPartitionsOpt.getOrElse(Seq.empty[TopicIdPartition]).map { topicIdPartition =>
             val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
               Optional.empty(), "", Errors.NONE)
-            topicPartition -> partitionData
+            topicIdPartition -> partitionData
           }.toMap
         } else {
-          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
-
-          topicPartitions.map { topicPartition =>
-            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
-              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
+          def resolvePartitionData(topicIdPartition: TopicIdPartition): PartitionData = {
+            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicIdPartition)) {
+              new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                 Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
             } else {
-              val partitionData = group.offset(topicPartition) match {
+              group.offset(topicIdPartition) match {
                 case None =>
                   new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                     Optional.empty(), "", Errors.NONE)
                 case Some(offsetAndMetadata) =>
                   new PartitionData(offsetAndMetadata.offset,
                     offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
-              topicPartition -> partitionData
             }
-          }.toMap
+          }
+
+          topicIdPartitionsOpt match {
+            case Some(topicIdPartitions) =>
+              topicIdPartitions.map { topicIdPartition =>
+                topicIdPartition -> resolvePartitionData(topicIdPartition)
+              }.toMap
+
+            case None =>
+              val topicIds = replicaManager.metadataCache.topicNamesToIds()
+              group.allOffsets.keySet.map { topicPartition =>
+                Option(topicIds.get(topicPartition.topic())) match {
+                  case Some(topicId) =>
+                    val topicIdPartition = new TopicIdPartition(topicId, topicPartition)
+                    topicIdPartition -> resolvePartitionData(topicIdPartition)
+                  case None =>
+                    val zeroIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, topicPartition)
+                    zeroIdPartition -> OffsetFetchResponse.UNKNOWN_PARTITION

Review Comment:
   This will be replaced with `UNKNOWN_TOPIC_ID` when the Offset Fetch request and response schema is updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13493:
URL: https://github.com/apache/kafka/pull/13493#issuecomment-1741511409

   Closing this PR for now as the topic id work will be done later. We can re-open it when we resume the work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1158816855


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Where will the topic ID come from here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1158947529


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   What cases are we considering? A topic should have an ID as long as controller is ibp 2.8+



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1158907492


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -291,24 +291,24 @@ private[group] class GroupCoordinatorAdapter(
     topics: util.List[OffsetFetchRequestData.OffsetFetchRequestTopics],
     requireStable: Boolean
   ): CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]] = {
-    val topicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+    val topicIdPartitions = new mutable.ArrayBuffer[TopicIdPartition]()
     topics.forEach { topic =>
       topic.partitionIndexes.forEach { partition =>
-        topicPartitions += new TopicPartition(topic.name, partition)
+        topicIdPartitions += new TopicIdPartition(Uuid.ZERO_UUID, partition, topic.name)

Review Comment:
   Yes, you are right. The topic id will come from `OffsetFetchRequestTopics`.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Until topic ids become persisted in the group metadata, we won't have the topic ID unless it is resolved using the metadata cache. I shouldn't have used the zero id here, thanks for pointing it out!
   
   If we want to maintain the mapping performed in `getOffsets` which is using `TopicIdPartition` as keys, we could instead perform a resolution of topic id from the topic name as done in the `KafkaApis` request-handler for topic-id-aware requests before they are processed. But then we would need to consider the cases where an id cannot be found. I will think about it a bit more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1163210151


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   No worries :) I think for other protocols we return with UNKNOWN_TOPIC_ID in KafkaApis when doing the conversion, which would typically be retriable. But would have to look into the details here. Might not be as simple since KafkaApis is a place where we can return with an error easily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org