You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/07 18:20:51 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r665599081



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -316,8 +325,16 @@ private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
         Set<String> invalidTopics = new HashSet<>();
 
         List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
+        Map<String, Uuid> topicIds = new HashMap<>();
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
             topics.add(metadata.topic());
+            boolean changedTopicId = false;
+            if (!metadata.topicId().equals(Uuid.ZERO_UUID)) {
+                topicIds.put(metadata.topic(), metadata.topicId());

Review comment:
       nit: maybe we can pull out a variable for `metadata.topic()` since there are 10 or so uses

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
+            if (shouldRetainTopic.test(entry.getKey())) {
+                newTopicIds.put(entry.getKey(), entry.getValue());
+            }
+        }
+
         for (PartitionMetadata partition : addPartitions) {
             newMetadataByPartition.put(partition.topicPartition, partition);
+            Uuid id = topicIds.get(partition.topic());
+            if (id != null)
+                newTopicIds.put(partition.topic(), id);
+            else
+                // Remove if the latest metadata does not have a topic ID

Review comment:
       What is the rationale to discard topicId information? Is this to deal with downgrades?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
                             Set<String> addInvalidTopics,
                             Set<String> addInternalTopics,
                             Node newController,
+                            Map<String, Uuid> topicIds,
                             BiPredicate<String, Boolean> retainTopic) {
 
         Predicate<String> shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic));
 
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
+        Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
+
+        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
+        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
+        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {

Review comment:
       nit: map iterations are a little more readable with a `forEach`
   ```java
   this.topicIds.forEach((topicName, topicId) -> {
     ...
   });
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -367,13 +384,14 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
      */
     private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
             MetadataResponse.PartitionMetadata partitionMetadata,
-            boolean hasReliableLeaderEpoch) {
+            boolean hasReliableLeaderEpoch,
+            boolean changedTopicId) {
         TopicPartition tp = partitionMetadata.topicPartition;
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch) {
+            if (currentEpoch == null || newEpoch >= currentEpoch || changedTopicId) {

Review comment:
       The log message below does not make much sense if the topicId has changed. Maybe this should be a separate case?

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -93,6 +104,14 @@ private MetadataCache(String clusterId,
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
+    Map<String, Uuid> topicIds() {

Review comment:
       Do we need to expose the map or could we just have lookup methods:
   ```java
   Uuid topicId(String topicName);
   String topicName(Uuid topicId);
   ```

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -874,18 +920,34 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) {
         assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic")));
         assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2);
         assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4);
+        assertTrue(cluster.topicIds().containsAll(topicIds.values()));
+
+        // Try removing the topic ID from keepValidTopic (simulating receiving a request from a controller with an older IBP)

Review comment:
       This test case is a bit much to take in. Is it possible to do a separate test case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org