You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/10/07 16:45:01 UTC

[kafka] branch trunk updated: KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds (#11311)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3f3a0e0  KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds (#11311)
3f3a0e0 is described below

commit 3f3a0e0d9ee1142d47913e744bfd57ebbc5e5b40
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Thu Oct 7 09:41:57 2021 -0700

    KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds (#11311)
    
    Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and
    KRaftMetadataCache#topicIdsToNames by returning a map subclass that
    exposes the TopicsImage data structures without copying them.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 checkstyle/import-control.xml                      |   1 +
 .../kafka/server/metadata/KRaftMetadataCache.scala |  11 +-
 .../java/org/apache/kafka/image/TopicsImage.java   |  19 ++++
 .../org/apache/kafka/image/TopicsImageTest.java    |  63 +++++++++--
 .../kafka/server/util/TranslatedValueMapView.java  | 117 +++++++++++++++++++++
 .../server/util/TranslatedValueMapViewTest.java    | 109 +++++++++++++++++++
 6 files changed, 303 insertions(+), 17 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 28ced52..ba3087b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -240,6 +240,7 @@
     <allow pkg="org.apache.kafka.common.requests" />
     <allow pkg="org.apache.kafka.metadata" />
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.util" />
   </subpackage>
 
   <subpackage name="metadata">
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index b7fbd17..1ff7a80 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -236,18 +236,13 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
       map(topic => topic.partitions().size())
   }
 
-  override def topicNamesToIds(): util.Map[String, Uuid] = {
-    _currentImage.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava
-  }
+  override def topicNamesToIds(): util.Map[String, Uuid] = _currentImage.topics.topicNameToIdView()
 
-  override def topicIdsToNames(): util.Map[Uuid, String] = {
-    _currentImage.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava
-  }
+  override def topicIdsToNames(): util.Map[Uuid, String] = _currentImage.topics.topicIdToNameView()
 
   override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = {
     val image = _currentImage
-    (image.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava,
-      image.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava)
+    (image.topics.topicNameToIdView(), image.topics.topicIdToNameView())
   }
 
   // if the leader is not known, return None;
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
index 59b31a7..c0b218b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
@@ -20,6 +20,7 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.util.TranslatedValueMapView;
 
 import java.util.Collections;
 import java.util.List;
@@ -92,6 +93,24 @@ public final class TopicsImage {
         return Objects.hash(topicsById, topicsByName);
     }
 
+    /**
+     * Expose a view of this TopicsImage as a map from topic names to IDs.
+     *
+     * Like TopicsImage itself, this map is immutable.
+     */
+    public Map<String, Uuid> topicNameToIdView() {
+        return new TranslatedValueMapView<>(topicsByName, image -> image.id());
+    }
+
+    /**
+     * Expose a view of this TopicsImage as a map from IDs to names.
+     *
+     * Like TopicsImage itself, this map is immutable.
+     */
+    public Map<Uuid, String> topicIdToNameView() {
+        return new TranslatedValueMapView<>(topicsById, image -> image.name());
+    }
+
     @Override
     public String toString() {
         return "TopicsImage(topicsById=" + topicsById.entrySet().stream().
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 1b5fbef..40a634f 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -44,6 +44,9 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECO
 import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
 import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(value = 40)
@@ -83,16 +86,22 @@ public class TopicsImageTest {
         return map;
     }
 
+    private static final Uuid FOO_UUID = Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA");
+
+    private static final Uuid BAR_UUID = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+
+    private static final Uuid BAZ_UUID = Uuid.fromString("tgHBnRglT5W_RlENnuG5vg");
+
     static {
         TOPIC_IMAGES1 = Arrays.asList(
-            newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
+            newTopicImage("foo", FOO_UUID,
                 new PartitionRegistration(new int[] {2, 3, 4},
                     new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
                 new PartitionRegistration(new int[] {3, 4, 5},
                     new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684),
                 new PartitionRegistration(new int[] {2, 4, 5},
                     new int[] {2, 4, 5}, Replicas.NONE, Replicas.NONE, 2, 10, 84)),
-            newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
+            newTopicImage("bar", BAR_UUID,
                 new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
                     new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 0, 1, 345)));
 
@@ -100,18 +109,18 @@ public class TopicsImageTest {
 
         DELTA1_RECORDS = new ArrayList<>();
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord().
-            setTopicId(Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA")),
+            setTopicId(FOO_UUID),
             REMOVE_TOPIC_RECORD.highestSupportedVersion()));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionChangeRecord().
-            setTopicId(Uuid.fromString("f62ptyETTjet8SL5ZeREiw")).
+            setTopicId(BAR_UUID).
             setPartitionId(0).setLeader(1),
             PARTITION_CHANGE_RECORD.highestSupportedVersion()));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord().
-            setName("baz").setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")),
+            setName("baz").setTopicId(BAZ_UUID),
             TOPIC_RECORD.highestSupportedVersion()));
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionRecord().
             setPartitionId(0).
-            setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")).
+            setTopicId(BAZ_UUID).
             setReplicas(Arrays.asList(1, 2, 3, 4)).
             setIsr(Arrays.asList(3, 4)).
             setRemovingReplicas(Collections.singletonList(2)).
@@ -124,10 +133,10 @@ public class TopicsImageTest {
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
 
         List<TopicImage> topics2 = Arrays.asList(
-            newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
+            newTopicImage("bar", BAR_UUID,
                 new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
                     new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 1, 2, 346)),
-            newTopicImage("baz", Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"),
+            newTopicImage("baz", BAZ_UUID,
                 new PartitionRegistration(new int[] {1, 2, 3, 4},
                     new int[] {3, 4}, new int[] {2}, new int[] {1}, 3, 2, 1)));
         IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2));
@@ -177,7 +186,7 @@ public class TopicsImageTest {
             new ApiMessageAndVersion(
                 new PartitionRecord()
                     .setPartitionId(1)
-                    .setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+                    .setTopicId(BAZ_UUID)
                     .setReplicas(Arrays.asList(4, 2, localId))
                     .setIsr(Arrays.asList(4, 2, localId))
                     .setLeader(4)
@@ -375,4 +384,40 @@ public class TopicsImageTest {
         TopicsImage nextImage = delta.apply();
         assertEquals(image, nextImage);
     }
+
+    @Test
+    public void testTopicNameToIdView() {
+        Map<String, Uuid> map = IMAGE1.topicNameToIdView();
+        assertTrue(map.containsKey("foo"));
+        assertEquals(FOO_UUID, map.get("foo"));
+        assertTrue(map.containsKey("bar"));
+        assertEquals(BAR_UUID, map.get("bar"));
+        assertFalse(map.containsKey("baz"));
+        assertEquals(null, map.get("baz"));
+        HashSet<Uuid> uuids = new HashSet<>();
+        map.values().iterator().forEachRemaining(u -> uuids.add(u));
+        HashSet<Uuid> expectedUuids = new HashSet<>(Arrays.asList(
+            Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
+            Uuid.fromString("f62ptyETTjet8SL5ZeREiw")));
+        assertEquals(expectedUuids, uuids);
+        assertThrows(UnsupportedOperationException.class, () -> map.remove("foo"));
+        assertThrows(UnsupportedOperationException.class, () -> map.put("bar", FOO_UUID));
+    }
+
+    @Test
+    public void testTopicIdToNameView() {
+        Map<Uuid, String> map = IMAGE1.topicIdToNameView();
+        assertTrue(map.containsKey(FOO_UUID));
+        assertEquals("foo", map.get(FOO_UUID));
+        assertTrue(map.containsKey(BAR_UUID));
+        assertEquals("bar", map.get(BAR_UUID));
+        assertFalse(map.containsKey(BAZ_UUID));
+        assertEquals(null, map.get(BAZ_UUID));
+        HashSet<String> names = new HashSet<>();
+        map.values().iterator().forEachRemaining(n -> names.add(n));
+        HashSet<String> expectedNames = new HashSet<>(Arrays.asList("foo", "bar"));
+        assertEquals(expectedNames, names);
+        assertThrows(UnsupportedOperationException.class, () -> map.remove(FOO_UUID));
+        assertThrows(UnsupportedOperationException.class, () -> map.put(FOO_UUID, "bar"));
+    }
 }
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java b/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java
new file mode 100644
index 0000000..9c85f6c
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.util;
+
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+
+
+/**
+ * A map which presents a lightweight view of another "underlying" map. Values in the
+ * underlying map will be translated by a callback before they are returned.
+ *
+ * This class is not internally synchronized. (Typically the underlyingMap is treated as
+ * immutable.)
+ */
+public final class TranslatedValueMapView<K, V, B> extends AbstractMap<K, V> {
+    class TranslatedValueSetView extends AbstractSet<Entry<K, V>> {
+        @Override
+        public Iterator<Entry<K, V>> iterator() {
+            return new TranslatedValueEntryIterator(underlyingMap.entrySet().iterator());
+        }
+
+        @SuppressWarnings("rawtypes")
+        @Override
+        public boolean contains(Object o) {
+            if (!(o instanceof Entry)) return false;
+            Entry other = (Entry) o;
+            if (!underlyingMap.containsKey(other.getKey())) return false;
+            B value = underlyingMap.get(other.getKey());
+            V translatedValue = valueMapping.apply(value);
+            return Objects.equals(translatedValue, other.getValue());
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return underlyingMap.isEmpty();
+        }
+
+        @Override
+        public int size() {
+            return underlyingMap.size();
+        }
+    }
+
+    class TranslatedValueEntryIterator implements Iterator<Entry<K, V>> {
+        private final Iterator<Entry<K, B>> underlyingIterator;
+
+        TranslatedValueEntryIterator(Iterator<Entry<K, B>> underlyingIterator) {
+            this.underlyingIterator = underlyingIterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return underlyingIterator.hasNext();
+        }
+
+        @Override
+        public Entry<K, V> next() {
+            Entry<K, B> underlyingEntry = underlyingIterator.next();
+            return new AbstractMap.SimpleImmutableEntry<>(underlyingEntry.getKey(),
+                valueMapping.apply(underlyingEntry.getValue()));
+        }
+    }
+
+    private final Map<K, B> underlyingMap;
+    private final Function<B, V> valueMapping;
+    private final TranslatedValueSetView set;
+
+    public TranslatedValueMapView(Map<K, B> underlyingMap,
+                                  Function<B, V> valueMapping) {
+        this.underlyingMap = underlyingMap;
+        this.valueMapping = valueMapping;
+        this.set = new TranslatedValueSetView();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return underlyingMap.containsKey(key);
+    }
+
+    @Override
+    public V get(Object key) {
+        if (!underlyingMap.containsKey(key)) return null;
+        B value = underlyingMap.get(key);
+        return valueMapping.apply(value);
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+        return set;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return underlyingMap.isEmpty();
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/util/TranslatedValueMapViewTest.java b/server-common/src/test/java/org/apache/kafka/server/util/TranslatedValueMapViewTest.java
new file mode 100644
index 0000000..cc8feea
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/util/TranslatedValueMapViewTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.util;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 60)
+public class TranslatedValueMapViewTest {
+    private static Map<String, Integer> createTestMap() {
+        Map<String, Integer> testMap = new TreeMap<>();
+        testMap.put("foo", 2);
+        testMap.put("bar", 3);
+        testMap.put("baz", 5);
+        return testMap;
+    }
+
+    @Test
+    public void testContains() {
+        Map<String, Integer> underlying = createTestMap();
+        TranslatedValueMapView<String, String, Integer> view =
+            new TranslatedValueMapView<>(underlying, v -> v.toString());
+        assertTrue(view.containsKey("foo"));
+        assertTrue(view.containsKey("bar"));
+        assertTrue(view.containsKey("baz"));
+        assertFalse(view.containsKey("quux"));
+        underlying.put("quux", 101);
+        assertTrue(view.containsKey("quux"));
+    }
+
+    @Test
+    public void testIsEmptyAndSize() {
+        Map<String, Integer> underlying = new HashMap<>();
+        TranslatedValueMapView<String, String, Integer> view =
+            new TranslatedValueMapView<>(underlying, v -> v.toString());
+        assertTrue(view.isEmpty());
+        assertEquals(0, view.size());
+        underlying.put("quux", 101);
+        assertFalse(view.isEmpty());
+        assertEquals(1, view.size());
+    }
+
+    @Test
+    public void testGet() {
+        Map<String, Integer> underlying = createTestMap();
+        TranslatedValueMapView<String, String, Integer> view =
+            new TranslatedValueMapView<>(underlying, v -> v.toString());
+        assertEquals("2", view.get("foo"));
+        assertEquals("3", view.get("bar"));
+        assertEquals("5", view.get("baz"));
+        assertNull(view.get("quux"));
+        underlying.put("quux", 101);
+        assertEquals("101", view.get("quux"));
+    }
+
+    @Test
+    public void testEntrySet() {
+        Map<String, Integer> underlying = createTestMap();
+        TranslatedValueMapView<String, String, Integer> view =
+            new TranslatedValueMapView<>(underlying, v -> v.toString());
+        assertEquals(3, view.entrySet().size());
+        assertFalse(view.entrySet().isEmpty());
+        assertTrue(view.entrySet().contains(new SimpleImmutableEntry<>("foo", "2")));
+        assertFalse(view.entrySet().contains(new SimpleImmutableEntry<>("bar", "4")));
+    }
+
+    @Test
+    public void testEntrySetIterator() {
+        Map<String, Integer> underlying = createTestMap();
+        TranslatedValueMapView<String, String, Integer> view =
+            new TranslatedValueMapView<>(underlying, v -> v.toString());
+        Iterator<Entry<String, String>> iterator = view.entrySet().iterator();
+        assertTrue(iterator.hasNext());
+        assertEquals(new SimpleImmutableEntry<>("bar", "3"), iterator.next());
+        assertTrue(iterator.hasNext());
+        assertEquals(new SimpleImmutableEntry<>("baz", "5"), iterator.next());
+        assertTrue(iterator.hasNext());
+        assertEquals(new SimpleImmutableEntry<>("foo", "2"), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+}