You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/09/08 20:37:23 UTC
[kafka] 01/01: KAFKA-13280: Avoid O(N) behavior in
KRaftMetadataCache#topicNamesToIds
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch KAFKA-13280
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 8f2c2316923d4e03959bfb58cfbb218a0390ba68
Author: Colin P. Mccabe <cm...@confluent.io>
AuthorDate: Wed Sep 8 13:35:08 2021 -0700
KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds
Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and
KRaftMetadataCache#topicIdsToNames by returning a map subclass that
exposes the TopicsImage data structures without copying them.
---
.../kafka/server/metadata/KRaftMetadataCache.scala | 11 +-
.../java/org/apache/kafka/image/TopicsImage.java | 148 +++++++++++++++++++++
.../org/apache/kafka/image/TopicsImageTest.java | 62 +++++++--
3 files changed, 204 insertions(+), 17 deletions(-)
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index b7fbd17..1ff7a80 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -236,18 +236,13 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
map(topic => topic.partitions().size())
}
- override def topicNamesToIds(): util.Map[String, Uuid] = {
- _currentImage.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava
- }
+ override def topicNamesToIds(): util.Map[String, Uuid] = _currentImage.topics.topicNameToIdView()
- override def topicIdsToNames(): util.Map[Uuid, String] = {
- _currentImage.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava
- }
+ override def topicIdsToNames(): util.Map[Uuid, String] = _currentImage.topics.topicIdToNameView()
override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = {
val image = _currentImage
- (image.topics.topicsByName().asScala.map{ case (topicName, topicImage) => (topicName, topicImage.id())}.asJava,
- image.topics.topicsById().asScala.map{ case (topicId, topicImage) => (topicId, topicImage.name())}.asJava)
+ (image.topics.topicNameToIdView(), image.topics.topicIdToNameView())
}
// if the leader is not known, return None;
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
index 59b31a7..db8dc70 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
@@ -21,10 +21,16 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import java.util.AbstractMap;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.AbstractSet;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -92,6 +98,148 @@ public final class TopicsImage {
return Objects.hash(topicsById, topicsByName);
}
+ /**
+ * Expose a view of this TopicsImage as a map from topic names to IDs.
+ *
+ * Like TopicsImage itself, this map is immutable.
+ */
+ public Map<String, Uuid> topicNameToIdView() {
+ return new TopicNameToIdMap();
+ }
+
+ class TopicNameToIdMap extends AbstractMap<String, Uuid> {
+ private final TopicNameToIdMapEntrySet set = new TopicNameToIdMapEntrySet();
+
+ @Override
+ public boolean containsKey(Object key) {
+ return topicsByName.containsKey(key);
+ }
+
+ @Override
+ public Uuid get(Object key) {
+ TopicImage image = topicsByName.get(key);
+ if (image == null) return null;
+ return image.id();
+ }
+
+ @Override
+ public Set<Entry<String, Uuid>> entrySet() {
+ return set;
+ }
+ }
+
+ class TopicNameToIdMapEntrySet extends AbstractSet<Entry<String, Uuid>> {
+ @Override
+ public Iterator<Entry<String, Uuid>> iterator() {
+ return new TopicNameToIdMapEntrySetIterator(topicsByName.entrySet().iterator());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean contains(Object o) {
+ if (!(o instanceof Entry)) return false;
+ Entry other = (Entry) o;
+ TopicImage image = topicsByName.get(other.getKey());
+ if (image == null) return false;
+ return image.id().equals(other.getValue());
+ }
+
+ @Override
+ public int size() {
+ return topicsByName.size();
+ }
+ }
+
+ static class TopicNameToIdMapEntrySetIterator implements Iterator<Entry<String, Uuid>> {
+ private final Iterator<Entry<String, TopicImage>> iterator;
+
+ TopicNameToIdMapEntrySetIterator(Iterator<Entry<String, TopicImage>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.iterator.hasNext();
+ }
+
+ @Override
+ public Entry<String, Uuid> next() {
+ Entry<String, TopicImage> entry = iterator.next();
+ return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue().id());
+ }
+ }
+
+ /**
+ * Expose a view of this TopicsImage as a map from IDs to names.
+ *
+ * Like TopicsImage itself, this map is immutable.
+ */
+ public Map<Uuid, String> topicIdToNameView() {
+ return new TopicIdToNameMap();
+ }
+
+ class TopicIdToNameMap extends AbstractMap<Uuid, String> {
+ private final TopicIdToNameMapEntrySet set = new TopicIdToNameMapEntrySet();
+
+ @Override
+ public boolean containsKey(Object key) {
+ return topicsById.containsKey(key);
+ }
+
+ @Override
+ public String get(Object key) {
+ TopicImage image = topicsById.get(key);
+ if (image == null) return null;
+ return image.name();
+ }
+
+ @Override
+ public Set<Entry<Uuid, String>> entrySet() {
+ return set;
+ }
+ }
+
+ class TopicIdToNameMapEntrySet extends AbstractSet<Entry<Uuid, String>> {
+ @Override
+ public Iterator<Entry<Uuid, String>> iterator() {
+ return new TopicIdToNameEntrySetIterator(topicsById.entrySet().iterator());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean contains(Object o) {
+ if (!(o instanceof Entry)) return false;
+ Entry other = (Entry) o;
+ TopicImage image = topicsById.get(other.getKey());
+ if (image == null) return false;
+ return image.name().equals(other.getValue());
+ }
+
+ @Override
+ public int size() {
+ return topicsById.size();
+ }
+ }
+
+ static class TopicIdToNameEntrySetIterator implements Iterator<Entry<Uuid, String>> {
+ private final Iterator<Entry<Uuid, TopicImage>> iterator;
+
+ TopicIdToNameEntrySetIterator(Iterator<Entry<Uuid, TopicImage>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.iterator.hasNext();
+ }
+
+ @Override
+ public Entry<Uuid, String> next() {
+ Entry<Uuid, TopicImage> entry = iterator.next();
+ return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue().name());
+ }
+ }
+
@Override
public String toString() {
return "TopicsImage(topicsById=" + topicsById.entrySet().stream().
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 1b5fbef..91cdd5b 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -38,12 +38,16 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@@ -83,16 +87,22 @@ public class TopicsImageTest {
return map;
}
+ private static final Uuid FOO_UUID = Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA");
+
+ private static final Uuid BAR_UUID = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+
+ private static final Uuid BAZ_UUID = Uuid.fromString("tgHBnRglT5W_RlENnuG5vg");
+
static {
TOPIC_IMAGES1 = Arrays.asList(
- newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
+ newTopicImage("foo", FOO_UUID,
new PartitionRegistration(new int[] {2, 3, 4},
new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
new PartitionRegistration(new int[] {3, 4, 5},
new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684),
new PartitionRegistration(new int[] {2, 4, 5},
new int[] {2, 4, 5}, Replicas.NONE, Replicas.NONE, 2, 10, 84)),
- newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
+ newTopicImage("bar", BAR_UUID,
new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 0, 1, 345)));
@@ -100,18 +110,18 @@ public class TopicsImageTest {
DELTA1_RECORDS = new ArrayList<>();
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord().
- setTopicId(Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA")),
+ setTopicId(FOO_UUID),
REMOVE_TOPIC_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionChangeRecord().
- setTopicId(Uuid.fromString("f62ptyETTjet8SL5ZeREiw")).
+ setTopicId(BAR_UUID).
setPartitionId(0).setLeader(1),
PARTITION_CHANGE_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new TopicRecord().
- setName("baz").setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")),
+ setName("baz").setTopicId(BAZ_UUID),
TOPIC_RECORD.highestSupportedVersion()));
DELTA1_RECORDS.add(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(0).
- setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")).
+ setTopicId(BAZ_UUID).
setReplicas(Arrays.asList(1, 2, 3, 4)).
setIsr(Arrays.asList(3, 4)).
setRemovingReplicas(Collections.singletonList(2)).
@@ -124,10 +134,10 @@ public class TopicsImageTest {
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
List<TopicImage> topics2 = Arrays.asList(
- newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
+ newTopicImage("bar", BAR_UUID,
new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 1, 2, 346)),
- newTopicImage("baz", Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"),
+ newTopicImage("baz", BAZ_UUID,
new PartitionRegistration(new int[] {1, 2, 3, 4},
new int[] {3, 4}, new int[] {2}, new int[] {1}, 3, 2, 1)));
IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2));
@@ -177,7 +187,7 @@ public class TopicsImageTest {
new ApiMessageAndVersion(
new PartitionRecord()
.setPartitionId(1)
- .setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+ .setTopicId(BAZ_UUID)
.setReplicas(Arrays.asList(4, 2, localId))
.setIsr(Arrays.asList(4, 2, localId))
.setLeader(4)
@@ -375,4 +385,38 @@ public class TopicsImageTest {
TopicsImage nextImage = delta.apply();
assertEquals(image, nextImage);
}
+
+ @Test
+ public void testTopicNameToIdView() {
+ Map<String, Uuid> map = IMAGE1.topicNameToIdView();
+ assertTrue(map.containsKey("foo"));
+ assertEquals(FOO_UUID, map.get("foo"));
+ assertTrue(map.containsKey("bar"));
+ assertEquals(BAR_UUID, map.get("bar"));
+ assertFalse(map.containsKey("baz"));
+ assertEquals(null, map.get("baz"));
+ HashSet<Uuid> uuids = new HashSet<>();
+ map.values().iterator().forEachRemaining(u -> uuids.add(u));
+ HashSet<Uuid> expectedUuids = new HashSet<>(Arrays.asList(
+ Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
+ Uuid.fromString("f62ptyETTjet8SL5ZeREiw")));
+ assertEquals(expectedUuids, uuids);
+ assertThrows(UnsupportedOperationException.class, () -> map.remove("foo"));
+ }
+
+ @Test
+ public void testTopicIdToNameView() {
+ Map<Uuid, String> map = IMAGE1.topicIdToNameView();
+ assertTrue(map.containsKey(FOO_UUID));
+ assertEquals("foo", map.get(FOO_UUID));
+ assertTrue(map.containsKey(BAR_UUID));
+ assertEquals("bar", map.get(BAR_UUID));
+ assertFalse(map.containsKey(BAZ_UUID));
+ assertEquals(null, map.get(BAZ_UUID));
+ HashSet<String> names = new HashSet<>();
+ map.values().iterator().forEachRemaining(n -> names.add(n));
+ HashSet<String> expectedNames = new HashSet<>(Arrays.asList("foo", "bar"));
+ assertEquals(expectedNames, names);
+ assertThrows(UnsupportedOperationException.class, () -> map.remove(FOO_UUID));
+ }
}