You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hangleton (via GitHub)" <gi...@apache.org> on 2023/02/13 16:41:04 UTC

[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1104738082


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##########
@@ -59,6 +62,14 @@ public OffsetCommitRequest build(short version) {
                 throw new UnsupportedVersionException("The broker offset commit protocol version " +
                         version + " does not support usage of config group.instance.id.");
             }
+            if (version >= 9) {
+                data.topics().forEach(topic -> {
+                    // Set the topic name to null if a topic ID for the topic is present.
+                    if (!Uuid.ZERO_UUID.equals(topic.topicId())) {

Review Comment:
   Note: topic ID must not be null for a request/response version >= 9 to be serialized. `ZERO_UUID` means no topic ID specified.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -425,35 +425,72 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val topicNames =
+        if (offsetCommitRequest.version() >= 9)
+          metadataCache.topicIdsToNames()
+        else
+          Collections.emptyMap[Uuid, String]()
+
+      // For version < 9, lookup from topicNames fails and the topic name (which cannot be null) is returned.
+      // For version >= 9, if lookup from topicNames fails, there are two possibilities:
+      //
+      // a) The topic ID was left to default and the topic name should have been populated as a fallback instead.
+      //    If none was provided, null is returned.
+      //
+      // b) The topic ID was not default but is not present in the local topic IDs cache. In this case, because
+      //    clients should make exclusive use of topic name or topic ID, the topic name should be null. If however
+      //    the client provided a topic name, we do not want to use it, because any topic with the same name
+      //    present locally would then have a topic ID which does not match the topic ID in the request.
+      def resolveTopicName(topic: OffsetCommitRequestData.OffsetCommitRequestTopic): String = {
+          val resolvedFromId = topicNames.get(topic.topicId())
+          if (resolvedFromId != null)
+            resolvedFromId
+          else if (offsetCommitRequest.version() < 9 || Uuid.ZERO_UUID.equals(topic.topicId)) {
+            topic.name()
+          } else {
+            null
+          }
+      }
+
       val authorizedTopics = authHelper.filterByAuthorized(
         request.context,
         READ,
         TOPIC,
-        offsetCommitRequest.data.topics.asScala
-      )(_.name)
+        offsetCommitRequest.data.topics.asScala.filter(topic => resolveTopicName(topic) != null)
+      )(resolveTopicName)
 
       val responseBuilder = new OffsetCommitResponse.Builder()
       val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
       offsetCommitRequest.data.topics.forEach { topic =>
-        if (!authorizedTopics.contains(topic.name)) {
+        val topicName = resolveTopicName(topic)
+        if (topicName == null) {

Review Comment:
   The sequence of validation chosen here reflects what is used on the fetch request path:
   
   - If topic IDs are used and the given topic ID cannot be resolved (and no fallback name is provided), send back `UNKNOWN_TOPIC_ID`;
   - If the topic name is valid but that name is not authorized, send `TOPIC_AUTHORIZATION_FAILED`;
   - If the topic name is authorized but not present in the metadata cache (in which case, that topic will not have been resolved via its ID because in this case, we expect it to be in the cache), send `UNKNOWN_TOPIC_OR_PARTITION`.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -192,7 +196,15 @@ public Builder merge(
             return this;
         }
 
-        public OffsetCommitResponse build() {
+        public OffsetCommitResponse build(short version) {
+            if (version >= 9) {
+                data.topics().forEach(topic -> {
+                    // Set the topic name to null if a topic ID for the topic is present.
+                    if (!Uuid.ZERO_UUID.equals(topic.topicId())) {

Review Comment:
   Note: topic ID must not be null for a request/response version >= 9 to be serialized. `ZERO_UUID` means no topic ID specified.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##########
@@ -78,11 +89,18 @@ public OffsetCommitRequestData data() {
         return data;
     }
 
-    public Map<TopicPartition, Long> offsets() {
+    public Map<TopicPartition, Long> offsets(TopicResolver topicResolver) {
         Map<TopicPartition, Long> offsets = new HashMap<>();
         for (OffsetCommitRequestTopic topic : data.topics()) {
+            String topicName = topic.name();
+
+            if (version() >= 9 && topicName == null) {

Review Comment:
   Note that if the `topicName` is not null, we should also check if it resolves to the same UUID as we have cached locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org