You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/09/08 20:37:23 UTC

[kafka] 01/01: KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch KAFKA-13280
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8f2c2316923d4e03959bfb58cfbb218a0390ba68
Author: Colin P. Mccabe <cm...@confluent.io>
AuthorDate: Wed Sep 8 13:35:08 2021 -0700

    KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds
    
    Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and
    KRaftMetadataCache#topicIdsToNames by returning a map subclass that
    exposes the TopicsImage data structures without copying them.
---
 .../kafka/server/metadata/KRaftMetadataCache.scala |  11 +-
 .../java/org/apache/kafka/image/TopicsImage.java   | 148 +++++++++++++++++++++
 .../org/apache/kafka/image/TopicsImageTest.java    |  62 +++++++--
 3 files changed, 204 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index b7fbd17..1ff7a80 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -236,18 +236,13 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
       map(topic => topic.partitions().size())
   }
 
-  override def topicNamesToIds(): util.Map[String, Uuid] = {
-    _currentImage.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava
-  }
+  override def topicNamesToIds(): util.Map[String, Uuid] = _currentImage.topics.topicNameToIdView()
 
-  override def topicIdsToNames(): util.Map[Uuid, String] = {
-    _currentImage.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava
-  }
+  override def topicIdsToNames(): util.Map[Uuid, String] = _currentImage.topics.topicIdToNameView()
 
   override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = {
     val image = _currentImage
-    (image.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava,
-      image.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava)
+    (image.topics.topicNameToIdView(), image.topics.topicIdToNameView())
   }
 
   // if the leader is not known, return None;
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
index 59b31a7..db8dc70 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
@@ -21,10 +21,16 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
+import java.util.AbstractMap;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.AbstractSet;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -92,6 +98,148 @@ public final class TopicsImage {
         return Objects.hash(topicsById, topicsByName);
     }
 
+    /**
+     * Expose a view of this TopicsImage as a map from topic names to IDs.
+     *
+     * Like TopicsImage itself, this map is immutable.
+     */
+    public Map<String, Uuid> topicNameToIdView() {
+        return new TopicNameToIdMap();
+    }
+
+    class TopicNameToIdMap extends AbstractMap<String, Uuid> {
+        private final TopicNameToIdMapEntrySet set = new TopicNameToIdMapEntrySet();
+
+        @Override
+        public boolean containsKey(Object key) {
+            return topicsByName.containsKey(key);
+        }
+
+        @Override
+        public Uuid get(Object key) {
+            TopicImage image = topicsByName.get(key);
+            if (image == null) return null;
+            return image.id();
+        }
+
+        @Override
+        public Set<Entry<String, Uuid>> entrySet() {
+            return set;
+        }
+    }
+
+    class TopicNameToIdMapEntrySet extends AbstractSet<Entry<String, Uuid>> {
+        @Override
+        public Iterator<Entry<String, Uuid>> iterator() {
+            return new TopicNameToIdMapEntrySetIterator(topicsByName.entrySet().iterator());
+        }
+
+        @SuppressWarnings("rawtypes")
+        @Override
+        public boolean contains(Object o) {
+            if (!(o instanceof Entry)) return false;
+            Entry other = (Entry) o;
+            TopicImage image = topicsByName.get(other.getKey());
+            if (image == null) return false;
+            return image.id().equals(other.getValue());
+        }
+
+        @Override
+        public int size() {
+            return topicsByName.size();
+        }
+    }
+
+    static class TopicNameToIdMapEntrySetIterator implements Iterator<Entry<String, Uuid>> {
+        private final Iterator<Entry<String, TopicImage>> iterator;
+
+        TopicNameToIdMapEntrySetIterator(Iterator<Entry<String, TopicImage>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return this.iterator.hasNext();
+        }
+
+        @Override
+        public Entry<String, Uuid> next() {
+            Entry<String, TopicImage> entry = iterator.next();
+            return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue().id());
+        }
+    }
+
+    /**
+     * Expose a view of this TopicsImage as a map from IDs to names.
+     *
+     * Like TopicsImage itself, this map is immutable.
+     */
+    public Map<Uuid, String> topicIdToNameView() {
+        return new TopicIdToNameMap();
+    }
+
+    class TopicIdToNameMap extends AbstractMap<Uuid, String> {
+        private final TopicIdToNameMapEntrySet set = new TopicIdToNameMapEntrySet();
+
+        @Override
+        public boolean containsKey(Object key) {
+            return topicsById.containsKey(key);
+        }
+
+        @Override
+        public String get(Object key) {
+            TopicImage image = topicsById.get(key);
+            if (image == null) return null;
+            return image.name();
+        }
+
+        @Override
+        public Set<Entry<Uuid, String>> entrySet() {
+            return set;
+        }
+    }
+
+    class TopicIdToNameMapEntrySet extends AbstractSet<Entry<Uuid, String>> {
+        @Override
+        public Iterator<Entry<Uuid, String>> iterator() {
+            return new TopicIdToNameEntrySetIterator(topicsById.entrySet().iterator());
+        }
+
+        @SuppressWarnings("rawtypes")
+        @Override
+        public boolean contains(Object o) {
+            if (!(o instanceof Entry)) return false;
+            Entry other = (Entry) o;
+            TopicImage image = topicsById.get(other.getKey());
+            if (image == null) return false;
+            return image.name().equals(other.getValue());
+        }
+
+        @Override
+        public int size() {
+            return topicsById.size();
+        }
+    }
+
+    static class TopicIdToNameEntrySetIterator implements Iterator<Entry<Uuid, String>> {
+        private final Iterator<Entry<Uuid, TopicImage>> iterator;
+
+        TopicIdToNameEntrySetIterator(Iterator<Entry<Uuid, TopicImage>> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return this.iterator.hasNext();
+        }
+
+        @Override
+        public Entry<Uuid, String> next() {
+            Entry<Uuid, TopicImage> entry = iterator.next();
+            return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue().name());
+        }
+    }
+
     @Override
     public String toString() {
         return "TopicsImage(topicsById=" + topicsById.entrySet().stream().
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 1b5fbef..91cdd5b 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -38,12 +38,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(value = 40)
@@ -83,16 +87,22 @@ public class TopicsImageTest {
         return map;
     }
 
+    private static final Uuid FOO_UUID = Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA");
+
+    private static final Uuid BAR_UUID = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+
+    private static final Uuid BAZ_UUID = Uuid.fromString("tgHBnRglT5W_RlENnuG5vg");
+
     static {
         TOPIC_IMAGES1 = Arrays.asList(
-            newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
+            newTopicImage("foo", FOO_UUID,
                 new PartitionRegistration(new int[] {2, 3, 4},
                     new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
                 new PartitionRegistration(new int[] {3, 4, 5},
                     new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684),
                 new PartitionRegistration(new int[] {2, 4, 5},
                     new int[] {2, 4, 5}, Replicas.NONE, Replicas.NONE, 2, 10, 84)),
-            newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
+            newTopicImage("bar", BAR_UUID,
                 new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
                     new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 0, 1, 345)));
 
@@ -100,18 +110,18 @@ public class TopicsImageTest {
 
         DELTA1_RECORDS = new ArrayList<>();
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord().
-            setTopicId(Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA")),
+            setTopicId(FOO_UUID),
             REMOVE_TOPIC_RECORD.highestSupportedVersion()));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionChangeRecord().
-            setTopicId(Uuid.fromString("f62ptyETTjet8SL5ZeREiw")).
+            setTopicId(BAR_UUID).
             setPartitionId(0).setLeader(1),
             PARTITION_CHANGE_RECORD.highestSupportedVersion()));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord().
-            setName("baz").setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")),
+            setName("baz").setTopicId(BAZ_UUID),
             TOPIC_RECORD.highestSupportedVersion()));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionRecord().
             setPartitionId(0).
-            setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")).
+            setTopicId(BAZ_UUID).
             setReplicas(Arrays.asList(1, 2, 3, 4)).
             setIsr(Arrays.asList(3, 4)).
             setRemovingReplicas(Collections.singletonList(2)).
@@ -124,10 +134,10 @@ public class TopicsImageTest {
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
 
         List<TopicImage> topics2 = Arrays.asList(
-            newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
+            newTopicImage("bar", BAR_UUID,
                 new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
                     new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 1, 2, 346)),
-            newTopicImage("baz", Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"),
+            newTopicImage("baz", BAZ_UUID,
                 new PartitionRegistration(new int[] {1, 2, 3, 4},
                     new int[] {3, 4}, new int[] {2}, new int[] {1}, 3, 2, 1)));
         IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2));
@@ -177,7 +187,7 @@ public class TopicsImageTest {
             new ApiMessageAndVersion(
                 new PartitionRecord()
                     .setPartitionId(1)
-                    .setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+                    .setTopicId(BAZ_UUID)
                     .setReplicas(Arrays.asList(4, 2, localId))
                     .setIsr(Arrays.asList(4, 2, localId))
                     .setLeader(4)
@@ -375,4 +385,38 @@ public class TopicsImageTest {
         TopicsImage nextImage = delta.apply();
         assertEquals(image, nextImage);
     }
+
+    @Test
+    public void testTopicNameToIdView() {
+        Map<String, Uuid> map = IMAGE1.topicNameToIdView();
+        assertTrue(map.containsKey("foo"));
+        assertEquals(FOO_UUID, map.get("foo"));
+        assertTrue(map.containsKey("bar"));
+        assertEquals(BAR_UUID, map.get("bar"));
+        assertFalse(map.containsKey("baz"));
+        assertEquals(null, map.get("baz"));
+        HashSet<Uuid> uuids = new HashSet<>();
+        map.values().iterator().forEachRemaining(u -> uuids.add(u));
+        HashSet<Uuid> expectedUuids = new HashSet<>(Arrays.asList(
+            Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
+            Uuid.fromString("f62ptyETTjet8SL5ZeREiw")));
+        assertEquals(expectedUuids, uuids);
+        assertThrows(UnsupportedOperationException.class, () -> map.remove("foo"));
+    }
+
+    @Test
+    public void testTopicIdToNameView() {
+        Map<Uuid, String> map = IMAGE1.topicIdToNameView();
+        assertTrue(map.containsKey(FOO_UUID));
+        assertEquals("foo", map.get(FOO_UUID));
+        assertTrue(map.containsKey(BAR_UUID));
+        assertEquals("bar", map.get(BAR_UUID));
+        assertFalse(map.containsKey(BAZ_UUID));
+        assertEquals(null, map.get(BAZ_UUID));
+        HashSet<String> names = new HashSet<>();
+        map.values().iterator().forEachRemaining(n -> names.add(n));
+        HashSet<String> expectedNames = new HashSet<>(Arrays.asList("foo", "bar"));
+        assertEquals(expectedNames, names);
+        assertThrows(UnsupportedOperationException.class, () -> map.remove(FOO_UUID));
+    }
 }