You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/12 13:48:15 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11331: KAFKA-13111: Re-evaluate Fetch Sessions when using topic IDs

dajac commented on a change in pull request #11331:
URL: https://github.com/apache/kafka/pull/11331#discussion_r727155822



##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -249,115 +215,126 @@ public String toString() {
          * Another reason is because we make use of the list ordering to optimize the preparation of
          * incremental fetch requests (see below).
          */
-        private LinkedHashMap<TopicPartition, PartitionData> next;
-        private Map<String, Uuid> topicIds;
+        private LinkedHashMap<TopicIdPartition, PartitionData> next;
         private final boolean copySessionPartitions;
         private int partitionsWithoutTopicIds = 0;
+        private int partitionsWithTopicIds = 0;
 
         Builder() {
             this.next = new LinkedHashMap<>();
-            this.topicIds = new HashMap<>();
             this.copySessionPartitions = true;
         }
 
         Builder(int initialSize, boolean copySessionPartitions) {
             this.next = new LinkedHashMap<>(initialSize);
-            this.topicIds = new HashMap<>(initialSize);
             this.copySessionPartitions = copySessionPartitions;
         }
 
         /**
          * Mark that we want data from this partition in the upcoming fetch.
          */
-        public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) {
-            next.put(topicPartition, data);
-            // topicIds should not change between adding partitions and building, so we can use putIfAbsent
-            if (!topicId.equals(Uuid.ZERO_UUID)) {
-                topicIds.putIfAbsent(topicPartition.topic(), topicId);
-            } else {
+        public void add(TopicIdPartition topicIdPartition, PartitionData data) {
+            next.put(topicIdPartition, data);
+            if (topicIdPartition.topicId().equals(Uuid.ZERO_UUID)) {
                 partitionsWithoutTopicIds++;
+            } else {
+                partitionsWithTopicIds++;
+            }
+        }
+
+        private Map<TopicIdPartition, PartitionData> buildFullSession(boolean canUseTopicIds) {
+            if (log.isDebugEnabled()) {
+                log.debug("Built full fetch {} for node {} with {}.",
+                        nextMetadata, node, partitionsToLogString(next.keySet()));
             }
+            sessionPartitions = next;
+            next = null;
+            // Only add topic IDs to the session if we are using topic IDs.
+            sessionTopicNames = new HashMap<>();
+            if (canUseTopicIds) {
+                Map<Uuid, Set<String>> newTopicNames = sessionPartitions.keySet().stream().collect(Collectors.groupingByConcurrent(TopicIdPartition::topicId,
+                        Collectors.mapping(topicIdPartition -> topicIdPartition.topicPartition().topic(), Collectors.toSet())));
+
+                sessionTopicNames = new HashMap<>(newTopicNames.size());
+                // There should only be one topic name per topic ID.
+                newTopicNames.forEach((topicId, topicNamesSet) -> topicNamesSet.forEach(topicName -> sessionTopicNames.put(topicId, topicName)));
+            } else {
+                sessionTopicNames = new HashMap<>();
+            }
+            return Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
         }
 
         public FetchRequestData build() {
             boolean canUseTopicIds = partitionsWithoutTopicIds == 0;
 
             if (nextMetadata.isFull()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Built full fetch {} for node {} with {}.",
-                              nextMetadata, node, partitionsToLogString(next.keySet()));
-                }
-                sessionPartitions = next;
-                next = null;
-                // Only add topic IDs to the session if we are using topic IDs.
-                if (canUseTopicIds) {
-                    sessionTopicIds = topicIds;
-                    sessionTopicNames = new HashMap<>(topicIds.size());
-                    topicIds.forEach((name, id) -> sessionTopicNames.put(id, name));
-                } else {
-                    sessionTopicIds = new HashMap<>();
-                    sessionTopicNames = new HashMap<>();
-                }
-                topicIds = null;
-                Map<TopicPartition, PartitionData> toSend =
-                    Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
-                Map<String, Uuid> toSendTopicIds =
-                    Collections.unmodifiableMap(new HashMap<>(sessionTopicIds));
-                Map<Uuid, String> toSendTopicNames =
-                    Collections.unmodifiableMap(new HashMap<>(sessionTopicNames));
-                return new FetchRequestData(toSend, Collections.emptyList(), toSend, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+                Map<TopicIdPartition, PartitionData> toSend = buildFullSession(canUseTopicIds);
+                return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata, canUseTopicIds);
             }
 
-            List<TopicPartition> added = new ArrayList<>();
-            List<TopicPartition> removed = new ArrayList<>();
-            List<TopicPartition> altered = new ArrayList<>();
-            for (Iterator<Entry<TopicPartition, PartitionData>> iter =
+            // If we were previously using a session without IDs and an ID was added to the builder, we will close the current session and open a new one with IDs.
+            // Same if vice versa.
+            boolean closeSessionDueToTopicIdChange = (requestUsedTopicIds && partitionsWithoutTopicIds > 0) || (!requestUsedTopicIds && partitionsWithTopicIds > 0);
+
+            if (closeSessionDueToTopicIdChange) {
+                canUseTopicIds = partitionsWithTopicIds > 0;
+                Map<TopicIdPartition, PartitionData> toSend = buildFullSession(canUseTopicIds);
+                if (canUseTopicIds && partitionsWithoutTopicIds == 0 || !canUseTopicIds && partitionsWithTopicIds == 0)
+                    return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata.nextCloseExisting(), !requestUsedTopicIds);
+                Map<TopicIdPartition, PartitionData> emptyMap = new LinkedHashMap<>();
+                return new FetchRequestData(emptyMap, Collections.emptyList(), emptyMap, nextMetadata.closeExisting(), !requestUsedTopicIds);
+            }
+
+            List<TopicIdPartition> added = new ArrayList<>();
+            List<TopicIdPartition> removed = new ArrayList<>();
+            List<TopicIdPartition> altered = new ArrayList<>();
+            for (Iterator<Entry<TopicIdPartition, PartitionData>> iter =
                      sessionPartitions.entrySet().iterator(); iter.hasNext(); ) {
-                Entry<TopicPartition, PartitionData> entry = iter.next();
-                TopicPartition topicPartition = entry.getKey();
+                Entry<TopicIdPartition, PartitionData> entry = iter.next();
+                TopicIdPartition topicIdPartition = entry.getKey();
                 PartitionData prevData = entry.getValue();
-                PartitionData nextData = next.remove(topicPartition);
+                PartitionData nextData = next.remove(topicIdPartition);
                 if (nextData != null) {
                     if (!prevData.equals(nextData)) {
                         // Re-add the altered partition to the end of 'next'
-                        next.put(topicPartition, nextData);
+                        next.put(topicIdPartition, nextData);
                         entry.setValue(nextData);
-                        altered.add(topicPartition);
+                        altered.add(topicIdPartition);
                     }
                 } else {
                     // Remove this partition from the session.
                     iter.remove();
                     // Indicate that we no longer want to listen to this partition.
-                    removed.add(topicPartition);
+                    removed.add(topicIdPartition);

Review comment:
       @jolshan I have been looking at the changes in the `FetchSessionHandler` as well at the changes in the related classes. I am a bit worried by two things: 1) the `FetchSessionHandler` is quite complicated now, at least a bit more than before; and 2) the reliance on the request version is spread in many places now.
   
   It seems that we could get away with a simpler solution which, I think, cover all the cases as well. At the moment in the `FetchSessionHandler`, we track the `added`, `removed` and `altered` partitions and the `FetchRequest` is constructed based `next` (`added` + `altered`) and `removed`. Now imagine that we would track another list `replaced` (or `upgraded`...). We would add a partition to this list when we detect that the topic id of the partition in `next` is different from the one in the session. Then, we would pass that new list to the `FetchRequestBuilder` as well. In the builder, we would add it to the forgotten set if version >= 13 or ignore it otherwise.
   
   I have tried to implement this based on `trunk`: https://github.com/apache/kafka/commit/a1de3910ddb9b64d0890dfd61a2e8263f2aa4864. I think that we should be able to do something similar based on your version which uses `TopicIdPartition`.
   
   The pros is that the version handling remains in the `FetchRequest` class. The cons is that it does not allow to restart the session immediately without doing a round-trip to the broker, which is not a big deal as this could only happen during the upgrade.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org