You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/17 23:12:40 UTC

[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r671754420



##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -186,36 +250,65 @@ public String toString() {
          * incremental fetch requests (see below).
          */
         private LinkedHashMap<TopicPartition, PartitionData> next;
+        private Map<String, Uuid> topicIds;
         private final boolean copySessionPartitions;
+        private int partitionsWithoutTopicIds = 0;
 
         Builder() {
             this.next = new LinkedHashMap<>();
+            this.topicIds = new HashMap<>();
             this.copySessionPartitions = true;
         }
 
         Builder(int initialSize, boolean copySessionPartitions) {
             this.next = new LinkedHashMap<>(initialSize);
+            this.topicIds = new HashMap<>(initialSize);
             this.copySessionPartitions = copySessionPartitions;
         }
 
         /**
          * Mark that we want data from this partition in the upcoming fetch.
          */
-        public void add(TopicPartition topicPartition, PartitionData data) {
+        public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) {
             next.put(topicPartition, data);
+            // topicIds should not change between adding partitions and building, so we can use putIfAbsent
+            if (!topicId.equals(Uuid.ZERO_UUID)) {
+                topicIds.putIfAbsent(topicPartition.topic(), topicId);
+            } else {
+                partitionsWithoutTopicIds++;
+            }
         }
 
         public FetchRequestData build() {
+            // For incremental sessions we don't have to worry about removed partitions when using topic IDs because we will either
+            //   a) already be using topic IDs and have the ID in the session
+            //   b) not be using topic IDs before and will close the session upon trying to use them

Review comment:
       Hi I'm tracking this bug here and will work on the fix: https://issues.apache.org/jira/browse/KAFKA-13079




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org