You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/01 19:41:22 UTC

[GitHub] [kafka] jolshan opened a new pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

jolshan opened a new pull request #10952:
URL: https://github.com/apache/kafka/pull/10952


   Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic. 
   
   With the addition of topic IDs, when we encounter a new topic ID (recreated topic) we can choose to get the topic's metadata even if the epoch is lower than the deleted topic.
   
   The idea is that when we update from no topic IDs to using topic IDs, we will not count the topic as new (It could be the same topic but with a new ID). We will only take the update if the topic ID changed.
   
   Added tests for this scenario as well as some tests for storing the topic IDs. Also added tests for topic IDs in metadata cache.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670630904



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -371,6 +372,43 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateOnChangedTopicIds() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic with no topic ID
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
+
+        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
+        // Don't update to an older one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));

Review comment:
       Should I update this for the test above as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669064028



##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID

Review comment:
       I suppose it is not needed, but I'm not sure if it helps a lot to remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670553369



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update the metadata
+            } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) {

Review comment:
       This bugged me a bit too. The issue is that the request itself uses Uuid.ZERO_UUID, so we'd just have to convert that to null. We can do that if it is clearer to read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r665652021



##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -93,6 +104,14 @@ private MetadataCache(String clusterId,
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
+    Map<String, Uuid> topicIds() {

Review comment:
       Hmm. I suppose we could have lookup methods. This has implications for the Fetch PR though. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#issuecomment-876418475


   Also, please remember to rebase with the latest `trunk` branch. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670554176



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -216,6 +217,14 @@ public synchronized boolean updateRequested() {
         }
     }
 
+    public synchronized Uuid topicId(String topicName) {

Review comment:
       Will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669815073



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -391,10 +393,15 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
             int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch || changedTopicId) {
+            if (currentEpoch == null || newEpoch >= currentEpoch) {
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            } else if (changedTopicId) {
+                log.debug("Topic ID changed, so this topic must have been recreated. " +

Review comment:
       Yeah, I was thinking that too. I just have to be careful when comparing to remember the zero uuid case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r665599081



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -316,8 +325,16 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
         Set<String> invalidTopics = new HashSet<>();
 
         List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
             topics.add(metadata.topic());
+            boolean changedTopicId = false;
+            if (!metadata.topicId().equals(Uuid.ZERO_UUID)) {
+                topicIds.put(metadata.topic(), metadata.topicId());

Review comment:
       nit: maybe we can pull out a variable for `metadata.topic()` since there are 10 or so uses

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID

Review comment:
       What is the rationale to discard topicId information? Is this to deal with downgrades?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {

Review comment:
       nit: map iterations are a little more readable with a `forEach`
   ```java
   this.topicIds.forEach((topicName, topicId) -> {
     ...
   });
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -367,13 +384,14 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
      */
     private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
             MetadataResponse.PartitionMetadata partitionMetadata,
-            boolean hasReliableLeaderEpoch) {
+            boolean hasReliableLeaderEpoch,
+            boolean changedTopicId) {
         TopicPartition tp = partitionMetadata.topicPartition;
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch) {
+            if (currentEpoch == null || newEpoch >= currentEpoch || changedTopicId) {

Review comment:
       The log message below does not make much sense if the topicId has changed. Maybe this should be a separate case?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -93,6 +104,14 @@ private MetadataCache(String clusterId,
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
+    Map<String, Uuid> topicIds() {

Review comment:
       Do we need to expose the map or could we just have lookup methods:
   ```java
   Uuid topicId(String topicName);
   String topicName(Uuid topicId);
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +920,34 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP)

Review comment:
       This test case is a bit much to take in. Is it possible to do a separate test case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670046323



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update the metadata

Review comment:
       nit: move this comment into the `if`

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update the metadata
+            } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) {

Review comment:
       Hmm, shouldn't this check come before the epoch check? Admittedly, it's unlikely that a recreated topic would have a higher epoch, but we may as well handle that case.
   
   By the way, it's a little inconsistent that this check uses both null and `Uuid.ZERO_UUID` to represent a missing value. Maybe we can use null consistently?

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -216,6 +217,14 @@ public synchronized boolean updateRequested() {
         }
     }
 
+    public synchronized Uuid topicId(String topicName) {

Review comment:
       Can you document that this returns null if the topicId does not exist or is not known? Similarly for `topicName`.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null));

Review comment:
       nit: `assertNull`

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(

Review comment:
       nit: you can use `Utils.mkSet` (a few more of these)

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update the metadata
+            } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) {
+                log.debug("Topic ID for partition {} changed from {} to {}, so this topic must have been recreated. " +
+                                "Using the newly updated metadata.", tp, oldTopicId, topicId);

Review comment:
       Instead of "Using the newly updated metadata," maybe we can say this:
   > Resetting the last seen epoch to {}.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -371,6 +372,43 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateOnChangedTopicIds() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic with no topic ID
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
+
+        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
+        // Don't update to an older one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));

Review comment:
       nit: this seems more concise
   ```java
           assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -371,6 +372,43 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateOnChangedTopicIds() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic with no topic ID
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
+
+        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
+        // Don't update to an older one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
+
+        // Don't cause update if it's the same one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 10));
+
+        // Update if we see newer epoch
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 4L);
+        assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
+
+        // We should also update if we see a new topicId even if the epoch is lower

Review comment:
       We may as well cover the case when the topicId is changed _and_ the epoch is higher.

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,36 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We start with the previous ID stored for retained topics and then
+        // update with newest information in the MetadataResponse.

Review comment:
       nit: "update with the newest information from the MetadataResponse."

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -863,8 +906,12 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
             "keepValidTopic",
             "newValidTopic")));
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 200);
+        topicIds.put("newValidTopic", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 200, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        topicIds.remove("oldValidTopic");
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+        assertEquals(metadata.topicId("oldValidTopic"), null);

Review comment:
       nit: use `assertNull`

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -316,20 +325,29 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
         Set<String> invalidTopics = new HashSet<>();
 
         List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
-            topics.add(metadata.topic());
+            String topicName = metadata.topic();
+            Uuid topicId = metadata.topicId();
+            topics.add(topicName);
+            // We only update if the current metadata since we can only compare when both topics have valid IDs
+            Uuid oldTopicId = null;
+            if (!topicId.equals(Uuid.ZERO_UUID)) {

Review comment:
       nit: I think we're guaranteed that `topicId` is not null (in spite of the inconsistent `equals`), but it's still a little nicer to write this check as `!Uuid.ZERO_UUID.equals(topicId)`

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -69,13 +73,20 @@ private MetadataCache(String clusterId,
                           Set<String> invalidTopics,
                           Set<String> internalTopics,
                           Node controller,
+                          Map<String, Uuid> topicIds,
                           Cluster clusterInstance) {
         this.clusterId = clusterId;
         this.nodes = nodes;
         this.unauthorizedTopics = unauthorizedTopics;
         this.invalidTopics = invalidTopics;
         this.internalTopics = internalTopics;
         this.controller = controller;
+        this.topicIds = topicIds;
+
+        this.topicNames = new HashMap<>(topicIds.size());

Review comment:
       As far as I can tell, there are no uses of this collection (`Metadata.topicName` is not referenced). Can we get rid of it?

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -316,20 +325,29 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
         Set<String> invalidTopics = new HashSet<>();
 
         List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
-            topics.add(metadata.topic());
+            String topicName = metadata.topic();
+            Uuid topicId = metadata.topicId();
+            topics.add(topicName);
+            // We only update if the current metadata since we can only compare when both topics have valid IDs

Review comment:
       nit: rephrase?

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -845,6 +887,7 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("oldValidTopic", "keepValidTopic")));
         assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));

Review comment:
       Do we want to make this assertion stronger? Or is `topicIds` a subset?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,36 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We start with the previous ID stored for retained topics and then
+        // update with newest information in the MetadataResponse.
+        // If the newest MetadataResponse:
+        //    - contains a new topic with no ID, add no IDs to newTopicIds

Review comment:
       nit: some of these cases do not seem worth mentioning. I think we're just saying that we always take the latest state, removing existing topicIds if necessary.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(
+                "validTopic1",
+                "validTopic2")));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("validTopic1", "validTopic2")));

Review comment:
       nit: the first argument should be the expected one

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(
+                "validTopic1",
+                "validTopic2")));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("validTopic1", "validTopic2")));
+        assertEquals(cluster.partitionsForTopic("validTopic1").size(), 2);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));

Review comment:
       nit: I think we can make this assertion stronger

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(
+                "validTopic1",
+                "validTopic2")));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("validTopic1", "validTopic2")));
+        assertEquals(cluster.partitionsForTopic("validTopic1").size(), 2);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
+        assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1"));
+
     }
+

Review comment:
       nit: extra newline (and above before the end brace)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669065110



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If the topic ID changed, updated the metadata
+            } else if (changedTopicId) {
+                log.debug("Topic ID changed, so this topic must have been recreated. " +
+                        "Removing last seen epoch {} for the old partition {} and adding epoch {} from new metadata", currentEpoch, tp, newEpoch);
+                lastSeenLeaderEpochs.put(tp, newEpoch);
+                return Optional.of(partitionMetadata);

Review comment:
       I see. I think the main issue here was that we would ignore metadata updates when we were simply looking at the epoch. I believe that this PR solves the problem, but we can continue to improve beyond this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669812349



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If the topic ID changed, updated the metadata
+            } else if (changedTopicId) {
+                log.debug("Topic ID changed, so this topic must have been recreated. " +
+                        "Removing last seen epoch {} for the old partition {} and adding epoch {} from new metadata", currentEpoch, tp, newEpoch);
+                lastSeenLeaderEpochs.put(tp, newEpoch);
+                return Optional.of(partitionMetadata);

Review comment:
       Yes, I was just pointing out that there is still a gap.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670553966



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -316,20 +325,29 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
         Set<String> invalidTopics = new HashSet<>();
 
         List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
-            topics.add(metadata.topic());
+            String topicName = metadata.topic();
+            Uuid topicId = metadata.topicId();
+            topics.add(topicName);
+            // We only update if the current metadata since we can only compare when both topics have valid IDs
+            Uuid oldTopicId = null;
+            if (!topicId.equals(Uuid.ZERO_UUID)) {

Review comment:
       Yeah. There can not be a null uuid in the request. but I can rephrase




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670748649



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update the metadata
+            } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) {

Review comment:
       Yes, the logging is what I had in mind. The log message is misleading otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670557519



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update the metadata
+            } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) {
+                log.debug("Topic ID for partition {} changed from {} to {}, so this topic must have been recreated. " +
+                                "Using the newly updated metadata.", tp, oldTopicId, topicId);

Review comment:
       So part of the reason why I say this is that this method decides whether we use or ignore the new metadata to update the cache in general. But I can change to the epoch message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670560488



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -845,6 +887,7 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("oldValidTopic", "keepValidTopic")));
         assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));

Review comment:
       To make it stronger, do we just also assert the inverse `(topicIds.values().containsAll(cluster.topicIds())` Or am I being silly and we can just check equality here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r665649142



##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID

Review comment:
       Yes, for the fetch path, we want to know when topic IDs are removed as quickly as possible so we can switch over to the older fetch version that uses topic names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#issuecomment-876521556


   Yup. This merge conflict was caused by my other PR 😅


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r668324802



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If the topic ID changed, updated the metadata
+            } else if (changedTopicId) {
+                log.debug("Topic ID changed, so this topic must have been recreated. " +
+                        "Removing last seen epoch {} for the old partition {} and adding epoch {} from new metadata", currentEpoch, tp, newEpoch);
+                lastSeenLeaderEpochs.put(tp, newEpoch);
+                return Optional.of(partitionMetadata);

Review comment:
       Leaving this comment here for lack of an alternative location. This patch takes a good first step in improving consumer behavior for the topic recreation case. At least we are able to detect and discard the old epoch state. In fact, it does a little more than that since, combined with the fetch validation logic, we are likely to detect that the old fetch position is no longer valid. Most likely this case would get raised to the user as a `LogTruncationException`, which might not be ideal, but at least is justifiable. However, it doesn't quite close the door on reuse of the fetch position since it may remain valid on the recreated topic. For the full solution, we probably need to track topicId in SubscriptionState as well so that we can force an offset reset whenever the topicId changes. I think it makes sense to do this in https://issues.apache.org/jira/browse/KAFKA-12975. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -391,10 +393,15 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
             int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch || changedTopicId) {
+            if (currentEpoch == null || newEpoch >= currentEpoch) {
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            } else if (changedTopicId) {
+                log.debug("Topic ID changed, so this topic must have been recreated. " +

Review comment:
       nit: seems like it would be more useful for the log message to indicate the topic ids that changed instead of the unrelated epochs.

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID

Review comment:
       Does this still make sense in the context of 3.0, which does not have topicId fetch logic?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -93,6 +104,13 @@ private MetadataCache(String clusterId,
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
+    Uuid topicId(String topicName) {
+        return topicIds.get(topicName);
+    }
+    String topicName(Uuid topicId) {

Review comment:
       nit: add newline between methods




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670559157



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update the metadata
+            } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) {

Review comment:
       Also, these checks result in the same thing. The only difference is the log debug line. If it makes more sense to log the topic ID change, I can switch the order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r666157068



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -367,7 +386,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
      */
     private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(

Review comment:
       We should also update the java doc to mention the TopicId changed case

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +149,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.

Review comment:
       Can't get what you mean here. Could you rephrase this sentence?

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -216,6 +217,14 @@ public synchronized boolean updateRequested() {
         }
     }
 
+    public synchronized Uuid topicId(String topicName) {
+        return cache.topicId(topicName);
+    }
+
+    public synchronized  String topicName(Uuid topicId) {

Review comment:
       nit: 2 spaces between `synchronized` and `String`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670558097



##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,36 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We start with the previous ID stored for retained topics and then
+        // update with newest information in the MetadataResponse.
+        // If the newest MetadataResponse:
+        //    - contains a new topic with no ID, add no IDs to newTopicIds

Review comment:
       sure. that's easier to understand I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670557719



##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -69,13 +73,20 @@ private MetadataCache(String clusterId,
                           Set<String> invalidTopics,
                           Set<String> internalTopics,
                           Node controller,
+                          Map<String, Uuid> topicIds,
                           Cluster clusterInstance) {
         this.clusterId = clusterId;
         this.nodes = nodes;
         this.unauthorizedTopics = unauthorizedTopics;
         this.invalidTopics = invalidTopics;
         this.internalTopics = internalTopics;
         this.controller = controller;
+        this.topicIds = topicIds;
+
+        this.topicNames = new HashMap<>(topicIds.size());

Review comment:
       sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670562472



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
 
         // Perform another metadata update, but this time all topic metadata should be cleared.
         retainTopics.set(Collections.emptySet());
 
-        metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300);
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null));
 
         cluster = metadata.fetch();
         assertEquals(cluster.clusterResource().clusterId(), newClusterId);
         assertEquals(cluster.nodes().size(), newNodes);
         assertEquals(cluster.invalidTopics(), Collections.emptySet());
         assertEquals(cluster.unauthorizedTopics(), Collections.emptySet());
         assertEquals(cluster.topics(), Collections.emptySet());
+        assertTrue(cluster.topicIds().isEmpty());
+    }
+
+    @Test
+    public void testMetadataMergeOnIdDowngrade() {
+        Time time = new MockTime();
+        Map<String, Uuid> topicIds = new HashMap<>();
+
+        final AtomicReference<Set<String>> retainTopics = new AtomicReference<>(new HashSet<>());
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), new ClusterResourceListeners()) {
+            @Override
+            protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
+                return retainTopics.get().contains(topic);
+            }
+        };
+
+        // Initialize a metadata instance with two topics. Both will be retained.
+        String clusterId = "clusterId";
+        int nodes = 2;
+        Map<String, Integer> topicPartitionCounts = new HashMap<>();
+        topicPartitionCounts.put("validTopic1", 2);
+        topicPartitionCounts.put("validTopic2", 3);
+
+        retainTopics.set(new HashSet<>(Arrays.asList(
+                "validTopic1",
+                "validTopic2")));
+
+        topicIds.put("validTopic1", Uuid.randomUuid());
+        topicIds.put("validTopic2", Uuid.randomUuid());
+        MetadataResponse metadataResponse =
+                RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 100, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP)
+        topicIds.remove("validTopic1");
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds(clusterId, nodes, Collections.emptyMap(), topicPartitionCounts, _tp -> 200, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds());
+        retainTopics.get().forEach(topic -> assertEquals(metadata.topicId(topic), topicIds.get(topic)));
+
+        Cluster cluster = metadata.fetch();
+        // We still have the topic, but it just doesn't have an ID.
+        assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("validTopic1", "validTopic2")));

Review comment:
       I can switch it, but I was trying to keep the ordering consistent in the file. For example. The test above does the same thing:
   `assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669069127



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -391,10 +393,15 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
             int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch || changedTopicId) {
+            if (currentEpoch == null || newEpoch >= currentEpoch) {
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            } else if (changedTopicId) {
+                log.debug("Topic ID changed, so this topic must have been recreated. " +

Review comment:
       Hmm I don't actually have access to both the topic IDs in this method. I can just do the old one and/or pass in the parameter for the new one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669810537



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -391,10 +393,15 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
             int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch || changedTopicId) {
+            if (currentEpoch == null || newEpoch >= currentEpoch) {
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            } else if (changedTopicId) {
+                log.debug("Topic ID changed, so this topic must have been recreated. " +

Review comment:
       If you pass the new one, then you can probably get rid of `changedTopicId`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10952:
URL: https://github.com/apache/kafka/pull/10952


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670769638



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            // If both topic IDs were valid and the topic ID changed, update the metadata
+            } else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) {

Review comment:
       Ah ok. I'll switch it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669899334



##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID

Review comment:
       Yeah. That was my reasoning. I thought the upgrade/downgrade case would be rare and the guarantees harder to reason about there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r665654598



##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -93,6 +104,14 @@ private MetadataCache(String clusterId,
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
+    Map<String, Uuid> topicIds() {

Review comment:
       It is used when getting all the topic IDs to put into the fetch request/session in Fetcher there. Maybe it is ok to call a method multiple times there. I also use it in tests, but maybe we could change that usage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669881047



##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID

Review comment:
       We can leave it as is I guess since I can't think of a strong case to remove it. It is a rare situation that we would hit this case and the consequence of losing the topic ID is probably not too bad. Worst case, we might miss a recreation which occurred while the cluster was rolling to upgrade or downgrade. On the other hand, it could lead to other kinds of problems if we allow updates to the epoch information tied to a topic ID without being able to validate that the topic ID is correct, so maybe this logic is for the best.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r670658031



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -845,6 +887,7 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("oldValidTopic", "keepValidTopic")));
         assertEquals(cluster.partitionsForTopic("oldValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 3);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));

Review comment:
       The collection types are different so I made a helper method to test both containAll. There may have been a helper somewhere else in the code but I couldn't find it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org