You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/11/15 18:53:31 UTC

[kafka] branch 2.8 updated: KAFKA-12257; Consumer mishandles topics deleted and recreated with the same name (#10952)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 1ffafb9  KAFKA-12257;  Consumer mishandles topics deleted and recreated with the same name (#10952)
1ffafb9 is described below

commit 1ffafb9519d5f234b88774202dd946325d6198a0
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Thu Jul 15 16:31:42 2021 -0700

    KAFKA-12257;  Consumer mishandles topics deleted and recreated with the same name (#10952)
    
    Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic. The benefit of this is that it lets us detect topic recreations. This allows the client to update metadata even if the leader epoch is lower than what was seen previously.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/clients/Metadata.java    |  53 ++++++---
 .../org/apache/kafka/clients/MetadataCache.java    |  40 ++++++-
 .../apache/kafka/clients/MetadataCacheTest.java    |   3 +-
 .../org/apache/kafka/clients/MetadataTest.java     | 129 +++++++++++++++++++--
 .../consumer/internals/ConsumerMetadataTest.java   |  15 ++-
 .../kafka/common/requests/RequestTestUtils.java    |  37 ++++--
 6 files changed, 236 insertions(+), 41 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 1dc1f39..e4f1756 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -216,6 +217,13 @@ public class Metadata implements Closeable {
         }
     }
 
+    /**
+     * @return the topic ID for the given topic name or null if the ID does not exist or is not known
+     */
+    public synchronized Uuid topicId(String topicName) {
+        return cache.topicId(topicName);
+    }
+
     public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) {
         Optional<MetadataResponse.PartitionMetadata> maybeMetadata = partitionMetadataIfCurrent(topicPartition);
         if (!maybeMetadata.isPresent())
@@ -316,20 +324,31 @@ public class Metadata implements Closeable {
         Set<String> invalidTopics = new HashSet<>();
 
         List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
-            topics.add(metadata.topic());
+            String topicName = metadata.topic();
+            Uuid topicId = metadata.topicId();
+            topics.add(topicName);
+            // We can only reason about topic ID changes when both IDs are valid, so keep oldId null unless the new metadata contains a topic ID
+            Uuid oldTopicId = null;
+            if (!Uuid.ZERO_UUID.equals(topicId)) {
+                topicIds.put(topicName, topicId);
+                oldTopicId = cache.topicId(topicName);
+            } else {
+                topicId = null;
+            }
 
-            if (!retainTopic(metadata.topic(), metadata.isInternal(), nowMs))
+            if (!retainTopic(topicName, metadata.isInternal(), nowMs))
                 continue;
 
             if (metadata.isInternal())
-                internalTopics.add(metadata.topic());
+                internalTopics.add(topicName);
 
             if (metadata.error() == Errors.NONE) {
                 for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
                     // Even if the partition's metadata includes an error, we need to handle
                     // the update to catch new epochs
-                    updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs())
+                    updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs(), topicId, oldTopicId)
                         .ifPresent(partitions::add);
 
                     if (partitionMetadata.error.exception() instanceof InvalidMetadataException) {
@@ -340,40 +359,48 @@ public class Metadata implements Closeable {
                 }
             } else {
                 if (metadata.error().exception() instanceof InvalidMetadataException) {
-                    log.debug("Requesting metadata update for topic {} due to error {}", metadata.topic(), metadata.error());
+                    log.debug("Requesting metadata update for topic {} due to error {}", topicName, metadata.error());
                     requestUpdate();
                 }
 
                 if (metadata.error() == Errors.INVALID_TOPIC_EXCEPTION)
-                    invalidTopics.add(metadata.topic());
+                    invalidTopics.add(topicName);
                 else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
-                    unauthorizedTopics.add(metadata.topic());
+                    unauthorizedTopics.add(topicName);
             }
         }
 
         Map<Integer, Node> nodes = metadataResponse.brokersById();
         if (isPartialUpdate)
             return this.cache.mergeWith(metadataResponse.clusterId(), nodes, partitions,
-                unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(),
+                unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), topicIds,
                 (topic, isInternal) -> !topics.contains(topic) && retainTopic(topic, isInternal, nowMs));
         else
             return new MetadataCache(metadataResponse.clusterId(), nodes, partitions,
-                unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller());
+                unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), topicIds);
     }
 
     /**
      * Compute the latest partition metadata to cache given ordering by leader epochs (if both
-     * available and reliable).
+     * available and reliable) and whether the topic ID changed.
      */
     private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
             MetadataResponse.PartitionMetadata partitionMetadata,
-            boolean hasReliableLeaderEpoch) {
+            boolean hasReliableLeaderEpoch,
+            Uuid topicId,
+            Uuid oldTopicId) {
         TopicPartition tp = partitionMetadata.topicPartition;
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
-            // If the received leader epoch is at least the same as the previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch) {
+            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+                // If both topic IDs were valid and the topic ID changed, update the metadata
+                log.debug("Topic ID for partition {} changed from {} to {}, so this topic must have been recreated. " +
+                          "Resetting the last seen epoch to {}.", tp, oldTopicId, topicId, newEpoch);
+                lastSeenLeaderEpochs.put(tp, newEpoch);
+                return Optional.of(partitionMetadata);
+            } else if (currentEpoch == null || newEpoch >= currentEpoch) {
+                // If the received leader epoch is at least the same as the previous one, update the metadata
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index 2da61ab..addb1ee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
 
@@ -49,6 +50,7 @@ public class MetadataCache {
     private final Set<String> internalTopics;
     private final Node controller;
     private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
+    private final Map<String, Uuid> topicIds;
 
     private Cluster clusterInstance;
 
@@ -58,8 +60,9 @@ public class MetadataCache {
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
                   Set<String> internalTopics,
-                  Node controller) {
-        this(clusterId, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, null);
+                  Node controller,
+                  Map<String, Uuid> topicIds) {
+        this(clusterId, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, topicIds, null);
     }
 
     private MetadataCache(String clusterId,
@@ -69,6 +72,7 @@ public class MetadataCache {
                           Set<String> invalidTopics,
                           Set<String> internalTopics,
                           Node controller,
+                          Map<String, Uuid> topicIds,
                           Cluster clusterInstance) {
         this.clusterId = clusterId;
         this.nodes = nodes;
@@ -76,6 +80,7 @@ public class MetadataCache {
         this.invalidTopics = invalidTopics;
         this.internalTopics = internalTopics;
         this.controller = controller;
+        this.topicIds = topicIds;
 
         this.metadataByPartition = new HashMap<>(partitions.size());
         for (PartitionMetadata p : partitions) {
@@ -93,6 +98,10 @@ public class MetadataCache {
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
+    Uuid topicId(String topicName) {
+        return topicIds.get(topicName);
+    }
+
     Optional<Node> nodeById(int id) {
         return Optional.ofNullable(nodes.get(id));
     }
@@ -120,6 +129,7 @@ public class MetadataCache {
      * @param addUnauthorizedTopics unauthorized topics to add
      * @param addInternalTopics internal topics to add
      * @param newController the new controller node
+     * @param topicIds the mapping from topic name to topic ID from the MetadataResponse
      * @param retainTopic returns whether a topic's metadata should be retained
      * @return the merged metadata cache
      */
@@ -130,13 +140,31 @@ public class MetadataCache {
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We start with the previous ID stored for retained topics and then
+        // update with newest information from the MetadataResponse. We always take the latest state, removing existing
+        // topic IDs if the latest state contains the topic name but not a topic ID.
+        this.topicIds.forEach((topicName, topicId) -> {
+            if (shouldRetainTopic.test(topicName)) {
+                newTopicIds.put(topicName, topicId);
+            }
+        });
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID
+                newTopicIds.remove(partition.topic());
         }
         for (Map.Entry<TopicPartition, PartitionMetadata> entry : metadataByPartition.entrySet()) {
             if (shouldRetainTopic.test(entry.getKey().topic())) {
@@ -149,7 +177,7 @@ public class MetadataCache {
         Set<String> newInternalTopics = fillSet(addInternalTopics, internalTopics, shouldRetainTopic);
 
         return new MetadataCache(newClusterId, newNodes, newMetadataByPartition.values(), newUnauthorizedTopics,
-                newInvalidTopics, newInternalTopics, newController);
+                newInvalidTopics, newInternalTopics, newController, newTopicIds);
     }
 
     /**
@@ -177,7 +205,7 @@ public class MetadataCache {
                 .map(metadata -> MetadataResponse.toPartitionInfo(metadata, nodes))
                 .collect(Collectors.toList());
         this.clusterInstance = new Cluster(clusterId, nodes.values(), partitionInfos, unauthorizedTopics,
-                invalidTopics, internalTopics, controller);
+                invalidTopics, internalTopics, controller, topicIds);
     }
 
     static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
@@ -189,12 +217,12 @@ public class MetadataCache {
         }
         return new MetadataCache(null, nodes, Collections.emptyList(),
                 Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
-                null, Cluster.bootstrap(addresses));
+                null, Collections.emptyMap(), Cluster.bootstrap(addresses));
     }
 
     static MetadataCache empty() {
         return new MetadataCache(null, Collections.emptyMap(), Collections.emptyList(),
-                Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Cluster.empty());
+                Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap(), Cluster.empty());
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
index f9f927c..387643c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
@@ -67,7 +67,8 @@ public class MetadataCacheTest {
                 Collections.emptySet(),
                 Collections.emptySet(),
                 Collections.emptySet(),
-                null);
+                null,
+                Collections.emptyMap());
 
         Cluster cluster = cache.cluster();
         assertNull(cluster.leaderFor(topicPartition));
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 9fd53cc..5b560ad 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -37,12 +38,14 @@ import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.MockClusterResourceListener;
 import org.junit.jupiter.api.Test;
 
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -76,6 +79,11 @@ public class MetadataTest {
                 Collections.emptyList());
     }
 
+    private <T> void assertEqualCollections(Collection<T> expected, Collection<T> actual) {
+        assertTrue(expected.containsAll(actual));
+        assertTrue(actual.containsAll(expected));
+    }
+
     @Test
     public void testMetadataUpdateAfterClose() {
         metadata.close();
@@ -372,6 +380,49 @@ public class MetadataTest {
     }
 
     @Test
+    public void testEpochUpdateOnChangedTopicIds() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic with no topic ID
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
+        // Don't update to an older one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Don't cause update if it's the same one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Update if we see newer epoch
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 4L);
+        assertEquals(Optional.of(12), metadata.lastSeenLeaderEpoch(tp));
+
+        // We should also update if we see a new topicId even if the epoch is lower
+        Map<String, Uuid> newTopicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 3, newTopicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 5L);
+        assertEquals(Optional.of(3), metadata.lastSeenLeaderEpoch(tp));
+
+        // Finally, update when the topic ID is new and the epoch is higher
+        Map<String, Uuid> newTopicIds2 = Collections.singletonMap("topic-1", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 20, newTopicIds2);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 6L);
+        assertEquals(Optional.of(20), metadata.lastSeenLeaderEpoch(tp));
+
+    }
+
+    @Test
     public void testRejectOldMetadata() {
         Map<String, Integer> partitionCounts = new HashMap<>();
         partitionCounts.put("topic-1", 1);
@@ -393,7 +444,7 @@ public class MetadataTest {
             MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 99,
                 (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
                         new MetadataResponse.PartitionMetadata(error, partition, leader,
-                            leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion());
+                            leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
             metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
             assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 1);
             assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
@@ -404,7 +455,7 @@ public class MetadataTest {
             MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, _tp -> 100,
                 (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
                         new MetadataResponse.PartitionMetadata(error, partition, leader,
-                            leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion());
+                            leaderEpoch, replicas, Collections.emptyList(), offlineReplicas), ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
             metadata.updateWithCurrentRequestVersion(metadataResponse, false, 20L);
             assertEquals(metadata.fetch().partition(tp).inSyncReplicas().length, 0);
             assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 100);
@@ -712,7 +763,7 @@ public class MetadataTest {
             (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
                 new MetadataResponse.PartitionMetadata(error, partition, Optional.of(node0.id()), leaderEpoch,
                     Collections.singletonList(node0.id()), Collections.emptyList(),
-                        Collections.singletonList(node1.id())), ApiKeys.METADATA.latestVersion());
+                        Collections.singletonList(node1.id())), ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
         metadata.updateWithCurrentRequestVersion(emptyMetadataResponse(), false, 0L);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 10L);
 
@@ -802,6 +853,7 @@ public class MetadataTest {
     @Test
     public void testMetadataMerge() {
         Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
 
         final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>());
         metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) {
@@ -823,17 +875,20 @@ public class MetadataTest {
         oldTopicPartitionCounts.put("oldValidTopic", 2);
         oldTopicPartitionCounts.put("keepValidTopic", 3);
 
-        retainTopics.set(new HashSet<>(Arrays.asList(
+        retainTopics.set(Utils.mkSet(
             "oldInvalidTopic",
             "keepInvalidTopic",
             "oldUnauthorizedTopic",
             "keepUnauthorizedTopic",
             "oldValidTopic",
-            "keepValidTopic")));
+            "keepValidTopic"));
 
+        topicIds.put("oldValidTopic", Uuid.randomUuid());
+        topicIds.put("keepValidTopic", Uuid.randomUuid());
         MetadataResponse metadataResponse =
-                RequestTestUtils.metadataUpdateWith(oldClusterId, oldNodes, oldTopicErrors, oldTopicPartitionCounts, _tp -> 100);
+                RequestTestUtils.metadataUpdateWithIds(oldClusterId, oldNodes, oldTopicErrors, oldTopicPartitionCounts, _tp -> 100, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
 
         // Update the metadata to add a new topic variant, "new", which will be retained with "keep". Note this
         // means that all of the "old" topics should be dropped.
@@ -845,6 +900,7 @@ public class MetadataTest {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("oldValidTopic", "keepValidTopic")));
         assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
+        assertEqualCollections(cluster.topicIds(), topicIds.values());
 
         String newClusterId = "newClusterId";
         int newNodes = oldNodes + 1;
@@ -855,16 +911,20 @@ public class MetadataTest {
         newTopicPartitionCounts.put("keepValidTopic", 2);
         newTopicPartitionCounts.put("newValidTopic", 4);
 
-        retainTopics.set(new HashSet<>(Arrays.asList(
+        retainTopics.set(Utils.mkSet(
             "keepInvalidTopic",
             "newInvalidTopic",
             "keepUnauthorizedTopic",
             "newUnauthorizedTopic",
             "keepValidTopic",
-            "newValidTopic")));
+            "newValidTopic"));
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 200);
+        topicIds.put("newValidTopic", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 200, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        topicIds.remove("oldValidTopic");
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+        assertNull(metadata.topicId("oldValidTopic"));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
@@ -874,12 +934,14 @@ public class MetadataTest {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertEqualCollections(cluster.topicIds(), topicIds.values());
 
         // Perform another metadata update, but this time all topic metadata should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> assertNull(metadata.topicId(topicName)));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
@@ -887,5 +949,52 @@ public class MetadataTest {
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
     }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(Utils.mkSet(
+                "validTopic1",
+                "validTopic2"));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(Utils.mkSet("validTopic1", "validTopic2"), cluster.topics());
+        assertEquals(2, cluster.partitionsForTopic("validTopic1").size());
+        assertEqualCollections(topicIds.values(), cluster.topicIds());
+        assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1"));
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 8adb6f6..73043ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataRequest;
@@ -111,9 +112,12 @@ public class ConsumerMetadataTest {
 
     @Test
     public void testTransientTopics() {
+        Map<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put("foo", Uuid.randomUuid());
         subscription.subscribe(singleton("foo"), new NoOpConsumerRebalanceListener());
         ConsumerMetadata metadata = newConsumerMetadata(false);
-        metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, singletonMap("foo", 1)), false, time.milliseconds());
+        metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds(1, singletonMap("foo", 1), topicIds), false, time.milliseconds());
+        assertEquals(topicIds.get("foo"), metadata.topicId("foo"));
         assertFalse(metadata.updateRequested());
 
         metadata.addTransientTopics(singleton("foo"));
@@ -125,14 +129,19 @@ public class ConsumerMetadataTest {
         Map<String, Integer> topicPartitionCounts = new HashMap<>();
         topicPartitionCounts.put("foo", 1);
         topicPartitionCounts.put("bar", 1);
-        metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, topicPartitionCounts), false, time.milliseconds());
+        topicIds.put("bar", Uuid.randomUuid());
+        metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds(1, topicPartitionCounts, topicIds), false, time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> assertEquals(topicId, metadata.topicId(topicName)));
         assertFalse(metadata.updateRequested());
 
         assertEquals(Utils.mkSet("foo", "bar"), new HashSet<>(metadata.fetch().topics()));
 
         metadata.clearTransientTopics();
-        metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, topicPartitionCounts), false, time.milliseconds());
+        topicIds.remove("bar");
+        metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds(1, topicPartitionCounts, topicIds), false, time.milliseconds());
         assertEquals(singleton("foo"), new HashSet<>(metadata.fetch().topics()));
+        assertEquals(topicIds.get("foo"), metadata.topicId("foo"));
+        assertEquals(topicIds.get("bar"), null);
     }
 
     private void testBasicSubscription(Set<String> expectedTopics, Set<String> expectedInternalTopics) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java
index 3334821..0d9395a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -82,6 +83,7 @@ public class RequestTestUtils {
             metadataResponseTopic
                     .setErrorCode(topicMetadata.error().code())
                     .setName(topicMetadata.topic())
+                    .setTopicId(topicMetadata.topicId())
                     .setIsInternal(topicMetadata.isInternal())
                     .setTopicAuthorizedOperations(topicMetadata.authorizedOperations());
 
@@ -109,14 +111,14 @@ public class RequestTestUtils {
                                                       final Map<String, Integer> topicPartitionCounts,
                                                       final Function<TopicPartition, Integer> epochSupplier) {
         return metadataUpdateWith("kafka-cluster", numNodes, Collections.emptyMap(),
-                topicPartitionCounts, epochSupplier, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion());
+                topicPartitionCounts, epochSupplier, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
     }
 
     public static MetadataResponse metadataUpdateWith(final String clusterId,
                                                       final int numNodes,
                                                       final Map<String, Integer> topicPartitionCounts) {
         return metadataUpdateWith(clusterId, numNodes, Collections.emptyMap(),
-                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion());
+                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
     }
 
     public static MetadataResponse metadataUpdateWith(final String clusterId,
@@ -124,7 +126,7 @@ public class RequestTestUtils {
                                                       final Map<String, Errors> topicErrors,
                                                       final Map<String, Integer> topicPartitionCounts) {
         return metadataUpdateWith(clusterId, numNodes, topicErrors,
-                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion());
+                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
     }
 
     public static MetadataResponse metadataUpdateWith(final String clusterId,
@@ -133,7 +135,7 @@ public class RequestTestUtils {
                                                       final Map<String, Integer> topicPartitionCounts,
                                                       final short responseVersion) {
         return metadataUpdateWith(clusterId, numNodes, topicErrors,
-                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, responseVersion);
+                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, responseVersion, Collections.emptyMap());
     }
 
     public static MetadataResponse metadataUpdateWith(final String clusterId,
@@ -142,7 +144,25 @@ public class RequestTestUtils {
                                                       final Map<String, Integer> topicPartitionCounts,
                                                       final Function<TopicPartition, Integer> epochSupplier) {
         return metadataUpdateWith(clusterId, numNodes, topicErrors,
-                topicPartitionCounts, epochSupplier, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion());
+                topicPartitionCounts, epochSupplier, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), Collections.emptyMap());
+    }
+
+    public static MetadataResponse metadataUpdateWithIds(final int numNodes,
+                                                         final Map<String, Integer> topicPartitionCounts,
+                                                         final Map<String, Uuid> topicIds) {
+        return metadataUpdateWith("kafka-cluster", numNodes, Collections.emptyMap(),
+                topicPartitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(),
+                topicIds);
+    }
+
+    public static MetadataResponse metadataUpdateWithIds(final String clusterId,
+                                                         final int numNodes,
+                                                         final Map<String, Errors> topicErrors,
+                                                         final Map<String, Integer> topicPartitionCounts,
+                                                         final Function<TopicPartition, Integer> epochSupplier,
+                                                         final Map<String, Uuid> topicIds) {
+        return metadataUpdateWith(clusterId, numNodes, topicErrors,
+                topicPartitionCounts, epochSupplier, MetadataResponse.PartitionMetadata::new, ApiKeys.METADATA.latestVersion(), topicIds);
     }
 
     public static MetadataResponse metadataUpdateWith(final String clusterId,
@@ -151,7 +171,8 @@ public class RequestTestUtils {
                                                       final Map<String, Integer> topicPartitionCounts,
                                                       final Function<TopicPartition, Integer> epochSupplier,
                                                       final PartitionMetadataSupplier partitionSupplier,
-                                                      final short responseVersion) {
+                                                      final short responseVersion,
+                                                      final Map<String, Uuid> topicIds) {
         final List<Node> nodes = new ArrayList<>(numNodes);
         for (int i = 0; i < numNodes; i++)
             nodes.add(new Node(i, "localhost", 1969 + i));
@@ -171,8 +192,8 @@ public class RequestTestUtils {
                         replicaIds, replicaIds, replicaIds));
             }
 
-            topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic,
-                    Topic.isInternal(topic), partitionMetadata));
+            topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic, topicIds.getOrDefault(topic, Uuid.ZERO_UUID),
+                    Topic.isInternal(topic), partitionMetadata, MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED));
         }
 
         for (Map.Entry<String, Errors> topicErrorEntry : topicErrors.entrySet()) {