You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/10/07 16:45:01 UTC
[kafka] branch trunk updated: KAFKA-13280: Avoid O(N) behavior in
KRaftMetadataCache#topicNamesToIds (#11311)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f3a0e0 KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds (#11311)
3f3a0e0 is described below
commit 3f3a0e0d9ee1142d47913e744bfd57ebbc5e5b40
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Thu Oct 7 09:41:57 2021 -0700
KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds (#11311)
Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and
KRaftMetadataCache#topicIdsToNames by returning a map subclass that
exposes the TopicsImage data structures without copying them.
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
checkstyle/import-control.xml | 1 +
.../kafka/server/metadata/KRaftMetadataCache.scala | 11 +-
.../java/org/apache/kafka/image/TopicsImage.java | 19 ++++
.../org/apache/kafka/image/TopicsImageTest.java | 63 +++++++++--
.../kafka/server/util/TranslatedValueMapView.java | 117 +++++++++++++++++++++
.../server/util/TranslatedValueMapViewTest.java | 109 +++++++++++++++++++
6 files changed, 303 insertions(+), 17 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 28ced52..ba3087b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -240,6 +240,7 @@
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.server.util" />
</subpackage>
<subpackage name="metadata">
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index b7fbd17..1ff7a80 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -236,18 +236,13 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
map(topic => topic.partitions().size())
}
- override def topicNamesToIds(): util.Map[String, Uuid] = {
- _currentImage.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava
- }
+ override def topicNamesToIds(): util.Map[String, Uuid] = _currentImage.topics.topicNameToIdView()
- override def topicIdsToNames(): util.Map[Uuid, String] = {
- _currentImage.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava
- }
+ override def topicIdsToNames(): util.Map[Uuid, String] = _currentImage.topics.topicIdToNameView()
override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = {
val image = _currentImage
- (image.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava,
- image.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava)
+ (image.topics.topicNameToIdView(), image.topics.topicIdToNameView())
}
// if the leader is not known, return None;
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
index 59b31a7..c0b218b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
@@ -20,6 +20,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.util.TranslatedValueMapView;
import java.util.Collections;
import java.util.List;
@@ -92,6 +93,24 @@ public final class TopicsImage {
return Objects.hash(topicsById, topicsByName);
}
+ /**
+ * Expose a view of this TopicsImage as a map from topic names to IDs.
+ *
+ * Like TopicsImage itself, this map is immutable.
+ */
+ public Map<String, Uuid> topicNameToIdView() {
+ return new TranslatedValueMapView<>(topicsByName, image -> image.id());
+ }
+
+ /**
+ * Expose a view of this TopicsImage as a map from IDs to names.
+ *
+ * Like TopicsImage itself, this map is immutable.
+ */
+ public Map<Uuid, String> topicIdToNameView() {
+ return new TranslatedValueMapView<>(topicsById, image -> image.name());
+ }
+
@Override
public String toString() {
return "TopicsImage(topicsById=" + topicsById.entrySet().stream().
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 1b5fbef..40a634f 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -44,6 +44,9 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECO
import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@@ -83,16 +86,22 @@ public class TopicsImageTest {
return map;
}
+ private static final Uuid FOO_UUID = Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA");
+
+ private static final Uuid BAR_UUID = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+
+ private static final Uuid BAZ_UUID = Uuid.fromString("tgHBnRglT5W_RlENnuG5vg");
+
static {
TOPIC_IMAGES1 = Arrays.asList(
- newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
+ newTopicImage("foo", FOO_UUID,
new PartitionRegistration(new int[] {2, 3, 4},
new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
new PartitionRegistration(new int[] {3, 4, 5},
new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684),
new PartitionRegistration(new int[] {2, 4, 5},
new int[] {2, 4, 5}, Replicas.NONE, Replicas.NONE, 2, 10, 84)),
- newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
+ newTopicImage("bar", BAR_UUID,
new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 0, 1, 345)));
@@ -100,18 +109,18 @@ public class TopicsImageTest {
DELTA1_RECORDS = new ArrayList<>();
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord().
- setTopicId(Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA")),
+ setTopicId(FOO_UUID),
REMOVE_TOPIC_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionChangeRecord().
- setTopicId(Uuid.fromString("f62ptyETTjet8SL5ZeREiw")).
+ setTopicId(BAR_UUID).
setPartitionId(0).setLeader(1),
PARTITION_CHANGE_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord().
- setName("baz").setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")),
+ setName("baz").setTopicId(BAZ_UUID),
TOPIC_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(0).
- setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")).
+ setTopicId(BAZ_UUID).
setReplicas(Arrays.asList(1, 2, 3, 4)).
setIsr(Arrays.asList(3, 4)).
setRemovingReplicas(Collections.singletonList(2)).
@@ -124,10 +133,10 @@ public class TopicsImageTest {
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
List<TopicImage> topics2 = Arrays.asList(
- newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
+ newTopicImage("bar", BAR_UUID,
new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 1, 2, 346)),
- newTopicImage("baz", Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"),
+ newTopicImage("baz", BAZ_UUID,
new PartitionRegistration(new int[] {1, 2, 3, 4},
new int[] {3, 4}, new int[] {2}, new int[] {1}, 3, 2, 1)));
IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2));
@@ -177,7 +186,7 @@ public class TopicsImageTest {
new ApiMessageAndVersion(
new PartitionRecord()
.setPartitionId(1)
- .setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+ .setTopicId(BAZ_UUID)
.setReplicas(Arrays.asList(4, 2, localId))
.setIsr(Arrays.asList(4, 2, localId))
.setLeader(4)
@@ -375,4 +384,40 @@ public class TopicsImageTest {
TopicsImage nextImage = delta.apply();
assertEquals(image, nextImage);
}
+
+ @Test
+ public void testTopicNameToIdView() {
+ Map<String, Uuid> map = IMAGE1.topicNameToIdView();
+ assertTrue(map.containsKey("foo"));
+ assertEquals(FOO_UUID, map.get("foo"));
+ assertTrue(map.containsKey("bar"));
+ assertEquals(BAR_UUID, map.get("bar"));
+ assertFalse(map.containsKey("baz"));
+ assertEquals(null, map.get("baz"));
+ HashSet<Uuid> uuids = new HashSet<>();
+ map.values().iterator().forEachRemaining(u -> uuids.add(u));
+ HashSet<Uuid> expectedUuids = new HashSet<>(Arrays.asList(
+ Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
+ Uuid.fromString("f62ptyETTjet8SL5ZeREiw")));
+ assertEquals(expectedUuids, uuids);
+ assertThrows(UnsupportedOperationException.class, () -> map.remove("foo"));
+ assertThrows(UnsupportedOperationException.class, () -> map.put("bar", FOO_UUID));
+ }
+
+ @Test
+ public void testTopicIdToNameView() {
+ Map<Uuid, String> map = IMAGE1.topicIdToNameView();
+ assertTrue(map.containsKey(FOO_UUID));
+ assertEquals("foo", map.get(FOO_UUID));
+ assertTrue(map.containsKey(BAR_UUID));
+ assertEquals("bar", map.get(BAR_UUID));
+ assertFalse(map.containsKey(BAZ_UUID));
+ assertEquals(null, map.get(BAZ_UUID));
+ HashSet<String> names = new HashSet<>();
+ map.values().iterator().forEachRemaining(n -> names.add(n));
+ HashSet<String> expectedNames = new HashSet<>(Arrays.asList("foo", "bar"));
+ assertEquals(expectedNames, names);
+ assertThrows(UnsupportedOperationException.class, () -> map.remove(FOO_UUID));
+ assertThrows(UnsupportedOperationException.class, () -> map.put(FOO_UUID, "bar"));
+ }
}
diff --git a/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java b/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java
new file mode 100644
index 0000000..9c85f6c
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/util/TranslatedValueMapView.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.util;
+
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+
+
+/**
+ * A map which presents a lightweight view of another "underlying" map. Values in the
+ * underlying map will be translated by a callback before they are returned.
+ *
+ * This class is not internally synchronized. (Typically the underlyingMap is treated as
+ * immutable.)
+ */
+public final class TranslatedValueMapView<K, V, B> extends AbstractMap<K, V> {
+ class TranslatedValueSetView extends AbstractSet<Entry<K, V>> {
+ @Override
+ public Iterator<Entry<K, V>> iterator() {
+ return new TranslatedValueEntryIterator(underlyingMap.entrySet().iterator());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean contains(Object o) {
+ if (!(o instanceof Entry)) return false;
+ Entry other = (Entry) o;
+ if (!underlyingMap.containsKey(other.getKey())) return false;
+ B value = underlyingMap.get(other.getKey());
+ V translatedValue = valueMapping.apply(value);
+ return Objects.equals(translatedValue, other.getValue());
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return underlyingMap.isEmpty();
+ }
+
+ @Override
+ public int size() {
+ return underlyingMap.size();
+ }
+ }
+
+ class TranslatedValueEntryIterator implements Iterator<Entry<K, V>> {
+ private final Iterator<Entry<K, B>> underlyingIterator;
+
+ TranslatedValueEntryIterator(Iterator<Entry<K, B>> underlyingIterator) {
+ this.underlyingIterator = underlyingIterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return underlyingIterator.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ Entry<K, B> underlyingEntry = underlyingIterator.next();
+ return new AbstractMap.SimpleImmutableEntry<>(underlyingEntry.getKey(),
+ valueMapping.apply(underlyingEntry.getValue()));
+ }
+ }
+
+ private final Map<K, B> underlyingMap;
+ private final Function<B, V> valueMapping;
+ private final TranslatedValueSetView set;
+
+ public TranslatedValueMapView(Map<K, B> underlyingMap,
+ Function<B, V> valueMapping) {
+ this.underlyingMap = underlyingMap;
+ this.valueMapping = valueMapping;
+ this.set = new TranslatedValueSetView();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return underlyingMap.containsKey(key);
+ }
+
+ @Override
+ public V get(Object key) {
+ if (!underlyingMap.containsKey(key)) return null;
+ B value = underlyingMap.get(key);
+ return valueMapping.apply(value);
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet() {
+ return set;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return underlyingMap.isEmpty();
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/util/TranslatedValueMapViewTest.java b/server-common/src/test/java/org/apache/kafka/server/util/TranslatedValueMapViewTest.java
new file mode 100644
index 0000000..cc8feea
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/util/TranslatedValueMapViewTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.util;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 60)
+public class TranslatedValueMapViewTest {
+ private static Map<String, Integer> createTestMap() {
+ Map<String, Integer> testMap = new TreeMap<>();
+ testMap.put("foo", 2);
+ testMap.put("bar", 3);
+ testMap.put("baz", 5);
+ return testMap;
+ }
+
+ @Test
+ public void testContains() {
+ Map<String, Integer> underlying = createTestMap();
+ TranslatedValueMapView<String, String, Integer> view =
+ new TranslatedValueMapView<>(underlying, v -> v.toString());
+ assertTrue(view.containsKey("foo"));
+ assertTrue(view.containsKey("bar"));
+ assertTrue(view.containsKey("baz"));
+ assertFalse(view.containsKey("quux"));
+ underlying.put("quux", 101);
+ assertTrue(view.containsKey("quux"));
+ }
+
+ @Test
+ public void testIsEmptyAndSize() {
+ Map<String, Integer> underlying = new HashMap<>();
+ TranslatedValueMapView<String, String, Integer> view =
+ new TranslatedValueMapView<>(underlying, v -> v.toString());
+ assertTrue(view.isEmpty());
+ assertEquals(0, view.size());
+ underlying.put("quux", 101);
+ assertFalse(view.isEmpty());
+ assertEquals(1, view.size());
+ }
+
+ @Test
+ public void testGet() {
+ Map<String, Integer> underlying = createTestMap();
+ TranslatedValueMapView<String, String, Integer> view =
+ new TranslatedValueMapView<>(underlying, v -> v.toString());
+ assertEquals("2", view.get("foo"));
+ assertEquals("3", view.get("bar"));
+ assertEquals("5", view.get("baz"));
+ assertNull(view.get("quux"));
+ underlying.put("quux", 101);
+ assertEquals("101", view.get("quux"));
+ }
+
+ @Test
+ public void testEntrySet() {
+ Map<String, Integer> underlying = createTestMap();
+ TranslatedValueMapView<String, String, Integer> view =
+ new TranslatedValueMapView<>(underlying, v -> v.toString());
+ assertEquals(3, view.entrySet().size());
+ assertFalse(view.entrySet().isEmpty());
+ assertTrue(view.entrySet().contains(new SimpleImmutableEntry<>("foo", "2")));
+ assertFalse(view.entrySet().contains(new SimpleImmutableEntry<>("bar", "4")));
+ }
+
+ @Test
+ public void testEntrySetIterator() {
+ Map<String, Integer> underlying = createTestMap();
+ TranslatedValueMapView<String, String, Integer> view =
+ new TranslatedValueMapView<>(underlying, v -> v.toString());
+ Iterator<Entry<String, String>> iterator = view.entrySet().iterator();
+ assertTrue(iterator.hasNext());
+ assertEquals(new SimpleImmutableEntry<>("bar", "3"), iterator.next());
+ assertTrue(iterator.hasNext());
+ assertEquals(new SimpleImmutableEntry<>("baz", "5"), iterator.next());
+ assertTrue(iterator.hasNext());
+ assertEquals(new SimpleImmutableEntry<>("foo", "2"), iterator.next());
+ assertFalse(iterator.hasNext());
+ }
+}