You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "showuon (via GitHub)" <gi...@apache.org> on 2023/02/21 09:23:34 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

showuon commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1112773897


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
     }
 
     public void write(ImageWriter writer, ImageWriterOptions options) {
-        for (TopicImage topicImage : topicsById.values()) {
-            topicImage.write(writer, options);
+        for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
+            entry.getValue().write(writer, options);

Review Comment:
   Why can't we use `topicsById.values()` as before?



##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -38,15 +40,21 @@
  */
 public final class TopicsImage {
     public static final TopicsImage EMPTY =
-        new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
+        new TopicsImage(map(), map());
+
+    final ImMap<Uuid, TopicImage> topicsById;
+    final ImMap<String, TopicImage> topicsByName;
 
-    private final Map<Uuid, TopicImage> topicsById;
-    private final Map<String, TopicImage> topicsByName;
+    public TopicsImage(ImMap<Uuid, TopicImage> topicsById,
+                       ImMap<String, TopicImage> topicsByName) {
+        this.topicsById = topicsById;
+        this.topicsByName = topicsByName;
+    }
 
-    public TopicsImage(Map<Uuid, TopicImage> topicsById,
-                       Map<String, TopicImage> topicsByName) {
-        this.topicsById = Collections.unmodifiableMap(topicsById);
-        this.topicsByName = Collections.unmodifiableMap(topicsByName);
+    public TopicsImage including(TopicImage topic) {

Review Comment:
   Looks like this is only used in test. Could this be in `protected` scope?



##########
metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java:
##########
@@ -126,29 +127,27 @@ public void handleMetadataVersionChange(MetadataVersion newVersion) {
     }
 
     public TopicsImage apply() {
-        Map<Uuid, TopicImage> newTopicsById = new HashMap<>(image.topicsById().size());
-        Map<String, TopicImage> newTopicsByName = new HashMap<>(image.topicsByName().size());
-        for (Entry<Uuid, TopicImage> entry : image.topicsById().entrySet()) {
-            Uuid id = entry.getKey();
-            TopicImage prevTopicImage = entry.getValue();
-            TopicDelta delta = changedTopics.get(id);
-            if (delta == null) {
-                if (!deletedTopicIds.contains(id)) {
-                    newTopicsById.put(id, prevTopicImage);
-                    newTopicsByName.put(prevTopicImage.name(), prevTopicImage);
-                }
+        ImMap<Uuid, TopicImage> newTopicsById = image.topicsById;
+        ImMap<String, TopicImage> newTopicsByName = image.topicsByName;
+        // apply all the deletes
+        for (Uuid topicId: deletedTopicIds) {
+            // it was deleted, so we have to remove it from the maps
+            TopicImage originalTopicToBeDeleted = image.topicsById.get(topicId);
+            if (originalTopicToBeDeleted == null) {
+                throw new IllegalStateException("Missing topic id " + topicId);
             } else {
-                TopicImage newTopicImage = delta.apply();
-                newTopicsById.put(id, newTopicImage);
-                newTopicsByName.put(delta.name(), newTopicImage);
+                newTopicsById = newTopicsById.without(topicId);
+                newTopicsByName = newTopicsByName.without(originalTopicToBeDeleted.name());
             }
         }
-        for (Entry<Uuid, TopicDelta> entry : changedTopics.entrySet()) {
-            if (!newTopicsById.containsKey(entry.getKey())) {
-                TopicImage newTopicImage = entry.getValue().apply();
-                newTopicsById.put(newTopicImage.id(), newTopicImage);
-                newTopicsByName.put(newTopicImage.name(), newTopicImage);
-            }
+        // apply all the updates/additions
+        for (Map.Entry<Uuid, TopicDelta> entry: changedTopics.entrySet()) {
+            Uuid topicId = entry.getKey();
+            TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply();
+            // put new information into the maps
+            String topicName = newTopicToBeAddedOrUpdated.name();
+            newTopicsById = newTopicsById.assoc(topicId, newTopicToBeAddedOrUpdated);
+            newTopicsByName = newTopicsByName.assoc(topicName, newTopicToBeAddedOrUpdated);

Review Comment:
   From the [javadoc](https://javadoc.io/doc/org.organicdesign/Paguro/latest/org/organicdesign/fp/collections/ImMap.html#assoc(K,V)), I'm not sure if the `assoc` method will do update with the same key, not adding a new one. Could you confirm that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org