You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/11/17 18:11:00 UTC

[kafka] branch 3.1 updated: KAFKA-12257; Consumer mishandles topics deleted and recreated with the same name (#11004)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new a749b77  KAFKA-12257; Consumer mishandles topics deleted and recreated with the same name (#11004)
a749b77 is described below

commit a749b774de3e8aa56f064a0c7e5558a6b7972ab4
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Wed Nov 17 09:57:17 2021 -0800

    KAFKA-12257; Consumer mishandles topics deleted and recreated with the same name (#11004)
    
    Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic. The benefit of this is that it lets us detect topic recreations. This allows the client to update metadata even if the leader epoch is lower than what was seen previously.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/clients/Metadata.java    |  50 +++++---
 .../org/apache/kafka/clients/MetadataCache.java    |  24 +---
 .../org/apache/kafka/clients/MetadataTest.java     | 127 +++++++++++++++++----
 .../consumer/internals/ConsumerMetadataTest.java   |   8 +-
 .../clients/consumer/internals/FetcherTest.java    |   4 +-
 .../kafka/common/requests/RequestTestUtils.java    |  32 +-----
 6 files changed, 153 insertions(+), 92 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 84ef174..d0df579 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -217,14 +217,13 @@ public class Metadata implements Closeable {
         }
     }
 
+    /**
+     * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache
+     */
     public synchronized Map<String, Uuid> topicIds() {
         return cache.topicIds();
     }
 
-    public synchronized Map<Uuid, String> topicNames() {
-        return cache.topicNames();
-    }
-
     public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) {
         Optional<MetadataResponse.PartitionMetadata> maybeMetadata = partitionMetadataIfCurrent(topicPartition);
         if (!maybeMetadata.isPresent())
@@ -326,22 +325,31 @@ public class Metadata implements Closeable {
 
         List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
         Map<String, Uuid> topicIds = new HashMap<>();
+        Map<String, Uuid> oldTopicIds = cache.topicIds();
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
-            topics.add(metadata.topic());
-            if (!metadata.topicId().equals(Uuid.ZERO_UUID))
-                topicIds.put(metadata.topic(), metadata.topicId());
+            String topicName = metadata.topic();
+            Uuid topicId = metadata.topicId();
+            topics.add(topicName);
+            // We can only reason about topic ID changes when both IDs are valid, so keep oldId null unless the new metadata contains a topic ID
+            Uuid oldTopicId = null;
+            if (!Uuid.ZERO_UUID.equals(topicId)) {
+                topicIds.put(topicName, topicId);
+                oldTopicId = oldTopicIds.get(topicName);
+            } else {
+                topicId = null;
+            }
 
-            if (!retainTopic(metadata.topic(), metadata.isInternal(), nowMs))
+            if (!retainTopic(topicName, metadata.isInternal(), nowMs))
                 continue;
 
             if (metadata.isInternal())
-                internalTopics.add(metadata.topic());
+                internalTopics.add(topicName);
 
             if (metadata.error() == Errors.NONE) {
                 for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
                     // Even if the partition's metadata includes an error, we need to handle
                     // the update to catch new epochs
-                    updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs())
+                    updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs(), topicId, oldTopicId)
                         .ifPresent(partitions::add);
 
                     if (partitionMetadata.error.exception() instanceof InvalidMetadataException) {
@@ -352,14 +360,14 @@ public class Metadata implements Closeable {
                 }
             } else {
                 if (metadata.error().exception() instanceof InvalidMetadataException) {
-                    log.debug("Requesting metadata update for topic {} due to error {}", metadata.topic(), metadata.error());
+                    log.debug("Requesting metadata update for topic {} due to error {}", topicName, metadata.error());
                     requestUpdate();
                 }
 
                 if (metadata.error() == Errors.INVALID_TOPIC_EXCEPTION)
-                    invalidTopics.add(metadata.topic());
+                    invalidTopics.add(topicName);
                 else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
-                    unauthorizedTopics.add(metadata.topic());
+                    unauthorizedTopics.add(topicName);
             }
         }
 
@@ -375,17 +383,25 @@ public class Metadata implements Closeable {
 
     /**
      * Compute the latest partition metadata to cache given ordering by leader epochs (if both
-     * available and reliable).
+     * available and reliable) and whether the topic ID changed.
      */
     private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
             MetadataResponse.PartitionMetadata partitionMetadata,
-            boolean hasReliableLeaderEpoch) {
+            boolean hasReliableLeaderEpoch,
+            Uuid topicId,
+            Uuid oldTopicId) {
         TopicPartition tp = partitionMetadata.topicPartition;
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
-            // If the received leader epoch is at least the same as the previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch) {
+            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+                // If both topic IDs were valid and the topic ID changed, update the metadata
+                log.info("Resetting the last seen epoch of partition {} to {} since the associated topicId changed from {} to {}",
+                         tp, newEpoch, oldTopicId, topicId);
+                lastSeenLeaderEpochs.put(tp, newEpoch);
+                return Optional.of(partitionMetadata);
+            } else if (currentEpoch == null || newEpoch >= currentEpoch) {
+                // If the received leader epoch is at least the same as the previous one, update the metadata
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index 01a19db..d7b6bfd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -51,7 +51,6 @@ public class MetadataCache {
     private final Node controller;
     private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
     private final Map<String, Uuid> topicIds;
-    private final Map<Uuid, String> topicNames;
 
     private Cluster clusterInstance;
 
@@ -83,11 +82,6 @@ public class MetadataCache {
         this.controller = controller;
         this.topicIds = topicIds;
 
-        this.topicNames = new HashMap<>(topicIds.size());
-        for (Map.Entry<String, Uuid> entry : topicIds.entrySet()) {
-            this.topicNames.put(entry.getValue(), entry.getKey());
-        }
-
         this.metadataByPartition = new HashMap<>(partitions.size());
         for (PartitionMetadata p : partitions) {
             this.metadataByPartition.put(p.topicPartition, p);
@@ -108,10 +102,6 @@ public class MetadataCache {
         return topicIds;
     }
 
-    Map<Uuid, String> topicNames() {
-        return topicNames;
-    }
-
     Optional<Node> nodeById(int id) {
         return Optional.ofNullable(nodes.get(id));
     }
@@ -156,15 +146,13 @@ public class MetadataCache {
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
-        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
 
-        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
-        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
-        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
-            if (shouldRetainTopic.test(entry.getKey())) {
-                newTopicIds.put(entry.getKey(), entry.getValue());
-            }
-        }
+        // We want the most recent topic ID. We start with the previous ID stored for retained topics and then
+        // update with newest information from the MetadataResponse. We always take the latest state, removing existing
+        // topic IDs if the latest state contains the topic name but not a topic ID.
+        Map<String, Uuid> newTopicIds = topicIds.entrySet().stream()
+                .filter(entry -> shouldRetainTopic.test(entry.getKey()))
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index d215799..1377b9e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.MockClusterResourceListener;
 import org.junit.jupiter.api.Test;
 
@@ -307,7 +308,6 @@ public class MetadataTest {
             "MockClusterResourceListener should be called when metadata is updated with non-bootstrap Cluster");
     }
 
-
     @Test
     public void testRequestUpdate() {
         assertFalse(metadata.updateRequested());
@@ -373,6 +373,48 @@ public class MetadataTest {
     }
 
     @Test
+    public void testEpochUpdateOnChangedTopicIds() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic with no topic ID
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
+        // Don't update to an older one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Don't cause update if it's the same one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Update if we see newer epoch
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 4L);
+        assertEquals(Optional.of(12), metadata.lastSeenLeaderEpoch(tp));
+
+        // We should also update if we see a new topicId even if the epoch is lower
+        Map<String, Uuid> newTopicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 3, newTopicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 5L);
+        assertEquals(Optional.of(3), metadata.lastSeenLeaderEpoch(tp));
+
+        // Finally, update when the topic ID is new and the epoch is higher
+        Map<String, Uuid> newTopicIds2 = Collections.singletonMap("topic-1", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 20, newTopicIds2);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 6L);
+        assertEquals(Optional.of(20), metadata.lastSeenLeaderEpoch(tp));
+    }
+
+    @Test
     public void testRejectOldMetadata() {
         Map<String, Integer> partitionCounts = new HashMap<>();
         partitionCounts.put("topic-1", 1);
@@ -825,20 +867,21 @@ public class MetadataTest {
         oldTopicPartitionCounts.put("oldValidTopic", 2);
         oldTopicPartitionCounts.put("keepValidTopic", 3);
 
-        retainTopics.set(new HashSet<>(Arrays.asList(
+        retainTopics.set(Utils.mkSet(
             "oldInvalidTopic",
             "keepInvalidTopic",
             "oldUnauthorizedTopic",
             "keepUnauthorizedTopic",
             "oldValidTopic",
-            "keepValidTopic")));
+            "keepValidTopic"));
 
         topicIds.put("oldValidTopic", Uuid.randomUuid());
         topicIds.put("keepValidTopic", Uuid.randomUuid());
         MetadataResponse metadataResponse =
                 RequestTestUtils.metadataUpdateWithIds(oldClusterId, oldNodes, oldTopicErrors, oldTopicPartitionCounts, _tp -> 100, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
-        assertEquals(metadata.topicIds(), topicIds);
+        Map<String, Uuid> metadataTopicIds1 = metadata.topicIds();
+        retainTopics.get().forEach(topic -> assertEquals(metadataTopicIds1.get(topic), topicIds.get(topic)));
 
         // Update the metadata to add a new topic variant, "new", which will be retained with "keep". Note this
         // means that all of the "old" topics should be dropped.
@@ -850,7 +893,7 @@ public class MetadataTest {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("oldValidTopic", "keepValidTopic")));
         assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
-        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
+        assertEquals(new HashSet<>(cluster.topicIds()), new HashSet<>(topicIds.values()));
 
         String newClusterId = "newClusterId";
         int newNodes = oldNodes + 1;
@@ -861,19 +904,21 @@ public class MetadataTest {
         newTopicPartitionCounts.put("keepValidTopic", 2);
         newTopicPartitionCounts.put("newValidTopic", 4);
 
-        retainTopics.set(new HashSet<>(Arrays.asList(
+        retainTopics.set(Utils.mkSet(
             "keepInvalidTopic",
             "newInvalidTopic",
             "keepUnauthorizedTopic",
             "newUnauthorizedTopic",
             "keepValidTopic",
-            "newValidTopic")));
+            "newValidTopic"));
 
         topicIds.put("newValidTopic", Uuid.randomUuid());
         metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 200, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
         topicIds.remove("oldValidTopic");
-        assertEquals(metadata.topicIds(), topicIds);
+        Map<String, Uuid> metadataTopicIds2 = metadata.topicIds();
+        retainTopics.get().forEach(topic -> assertEquals(metadataTopicIds2.get(topic), topicIds.get(topic)));
+        assertNull(metadataTopicIds2.get("oldValidTopic"));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
@@ -883,27 +928,15 @@ public class MetadataTest {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
-        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
-
-        // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP)
-        topicIds.remove("keepValidTopic");
-        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 200, topicIds);
-        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
-        assertEquals(metadata.topicIds(), topicIds);
-
-        cluster = metadata.fetch();
-        // We still have the topic, but it just doesn't have an ID.
-        assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
-        assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
-        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
-        assertEquals(Uuid.ZERO_UUID, cluster.topicId("keepValidTopic"));
+        assertEquals(new HashSet<>(cluster.topicIds()), new HashSet<>(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata should be cleared.
         retainTopics.set(Collections.emptySet());
 
         metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
-        assertEquals(metadata.topicIds(), Collections.emptyMap());
+        Map<String, Uuid> metadataTopicIds3 = metadata.topicIds();
+        topicIds.forEach((topicName, topicId) -> assertNull(metadataTopicIds3.get(topicName)));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
@@ -913,4 +946,52 @@ public class MetadataTest {
         assertEquals(cluster.topics(), Collections.emptySet());
         assertTrue(cluster.topicIds().isEmpty());
     }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(Utils.mkSet(
+                "validTopic1",
+                "validTopic2"));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        Map<String, Uuid> metadataTopicIds1 = metadata.topicIds();
+        retainTopics.get().forEach(topic -> assertEquals(metadataTopicIds1.get(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        Map<String, Uuid> metadataTopicIds2 = metadata.topicIds();
+        retainTopics.get().forEach(topic -> assertEquals(metadataTopicIds2.get(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(Utils.mkSet("validTopic1", "validTopic2"), cluster.topics());
+        assertEquals(2, cluster.partitionsForTopic("validTopic1").size());
+        assertEquals(new HashSet<>(topicIds.values()), new HashSet<>(cluster.topicIds()));
+        assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1"));
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 5401a2b..02ab81c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -117,7 +117,7 @@ public class ConsumerMetadataTest {
         subscription.subscribe(singleton("foo"), new NoOpConsumerRebalanceListener());
         ConsumerMetadata metadata = newConsumerMetadata(false);
         metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds(1, singletonMap("foo", 1), topicIds), false, time.milliseconds());
-        assertEquals(topicIds, metadata.topicIds());
+        assertEquals(topicIds.get("foo"), metadata.topicIds().get("foo"));
         assertFalse(metadata.updateRequested());
 
         metadata.addTransientTopics(singleton("foo"));
@@ -131,7 +131,8 @@ public class ConsumerMetadataTest {
         topicPartitionCounts.put("bar", 1);
         topicIds.put("bar", Uuid.randomUuid());
         metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds(1, topicPartitionCounts, topicIds), false, time.milliseconds());
-        assertEquals(topicIds, metadata.topicIds());
+        Map<String, Uuid> metadataTopicIds = metadata.topicIds();
+        topicIds.forEach((topicName, topicId) -> assertEquals(topicId, metadataTopicIds.get(topicName)));
         assertFalse(metadata.updateRequested());
 
         assertEquals(Utils.mkSet("foo", "bar"), new HashSet<>(metadata.fetch().topics()));
@@ -140,7 +141,8 @@ public class ConsumerMetadataTest {
         topicIds.remove("bar");
         metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds(1, topicPartitionCounts, topicIds), false, time.milliseconds());
         assertEquals(singleton("foo"), new HashSet<>(metadata.fetch().topics()));
-        assertEquals(topicIds, metadata.topicIds());
+        assertEquals(topicIds.get("foo"), metadata.topicIds().get("foo"));
+        assertEquals(topicIds.get("bar"), null);
     }
 
     private void testBasicSubscription(Set<String> expectedTopics, Set<String> expectedInternalTopics) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7509eb6..b3dee9e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -4345,8 +4345,8 @@ public class FetcherTest {
 
         // Inject an older version of the metadata response
         final short responseVersion = 8;
-        metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1,
-            Collections.emptyMap(), partitionCounts, responseVersion, topicIds), false, 0L);
+        metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith("dummy", 1,
+            Collections.emptyMap(), partitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, responseVersion, topicIds), false, 0L);
         fetcher.validateOffsetsIfNeeded();
         // Offset validation is skipped
         assertFalse(subscriptions.awaitingValidation(tp0));
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java
index e2366ce..d50e1b9 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java
@@ -149,7 +149,9 @@ public class RequestTestUtils {
     public static MetadataResponse metadataUpdateWithIds(final int numNodes,
                                                          final Map<String, Integer> topicPartitionCounts,
                                                          final Map<String, Uuid> topicIds) {
-        return metadataUpdateWithIds("kafka-cluster", numNodes, topicPartitionCounts, topicIds);
+        return metadataUpdateWith("kafka-cluster", numNodes, Collections.emptyMap(),
+                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(),
+                topicIds);
     }
 
     public static MetadataResponse metadataUpdateWithIds(final int numNodes,
@@ -177,34 +179,6 @@ public class RequestTestUtils {
 
     public static MetadataResponse metadataUpdateWithIds(final String clusterId,
                                                          final int numNodes,
-                                                         final Map<String, Integer> topicPartitionCounts,
-                                                         final Map<String, Uuid> topicIds) {
-        return metadataUpdateWith(clusterId, numNodes, Collections.emptyMap(),
-                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(),
-                topicIds);
-    }
-
-    public static MetadataResponse metadataUpdateWithIds(final String clusterId,
-                                                         final int numNodes,
-                                                         final Map<String, Errors> topicErrors,
-                                                         final Map<String, Integer> topicPartitionCounts,
-                                                         final Map<String, Uuid> topicIds) {
-        return metadataUpdateWith(clusterId, numNodes, topicErrors,
-                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), topicIds);
-    }
-
-    public static MetadataResponse metadataUpdateWithIds(final String clusterId,
-                                                         final int numNodes,
-                                                         final Map<String, Errors> topicErrors,
-                                                         final Map<String, Integer> topicPartitionCounts,
-                                                         final short responseVersion,
-                                                         final Map<String, Uuid> topicIds) {
-        return metadataUpdateWith(clusterId, numNodes, topicErrors,
-                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, responseVersion, topicIds);
-    }
-
-    public static MetadataResponse metadataUpdateWithIds(final String clusterId,
-                                                         final int numNodes,
                                                          final Map<String, Errors> topicErrors,
                                                          final Map<String, Integer> topicPartitionCounts,
                                                          final Function<TopicPartition, Integer> epochSupplier,