You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/08 21:37:40 UTC

[GitHub] [kafka] jolshan opened a new pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

jolshan opened a new pull request #11004:
URL: https://github.com/apache/kafka/pull/11004


   Trunk version of https://github.com/apache/kafka/pull/10952
   
   This PR slightly cleans up some of the changes made in https://github.com/apache/kafka/pull/9944
   
   Store topic ID info in consumer metadata. We will always take the topic ID from the latest metadata response and remove any topic IDs from the cache if the metadata response did not return a topic ID for the topic.
   
   With the addition of topic IDs, when we encounter a new topic ID (recreated topic) we can choose to get the topic's metadata even if the epoch is lower than the deleted topic.
   
   The idea is that when we update from no topic IDs to using topic IDs, we will not count the topic as new (It could be the same topic but with a new ID). We will only take the update if the topic ID changed.
   
   Added tests for this scenario as well as some tests for storing the topic IDs. Also added tests for topic IDs in metadata cache.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r747108702



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1206,7 +1205,8 @@ private void validatePositionsOnMetadataChange() {
                     fetchable.put(node, builder);
                 }
 
-                builder.add(partition, topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new FetchRequest.PartitionData(position.offset,
+                Uuid topicId = metadata.topicId(partition.topic());

Review comment:
       For my understanding -- we won't update the metadata during this method, correct? Or is there something like another thread that could update it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r750784403



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1206,7 +1205,8 @@ private void validatePositionsOnMetadataChange() {
                     fetchable.put(node, builder);
                 }
 
-                builder.add(partition, topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new FetchRequest.PartitionData(position.offset,
+                Uuid topicId = metadata.topicId(partition.topic());

Review comment:
       Apologies. I was being a bit slow here. I had not considered the possibility of the id of a given topic changing while we were building the fetch request. I had forgotten that the fetch builder logic does allow the same topic to be included multiple times. It do agree that it is probably better to not allow this. So reverting this change makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r666535365



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1206,7 +1205,8 @@ private void validatePositionsOnMetadataChange() {
                     fetchable.put(node, builder);
                 }
 
-                builder.add(partition, topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new FetchRequest.PartitionData(position.offset,
+                Uuid topicId = metadata.topicId(partition.topic());

Review comment:
       One change we are making for this PR is to just get the topic ID for a single provided topic name. I want to double check that the metadata (and underlying map) can not change when adding these partitions to the builder since the builder assumes IDs do not change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r749668458



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +379,49 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateOnChangedTopicIds() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic with no topic ID
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
+        // Don't update to an older one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Don't cause update if it's the same one
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 3L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Update if we see newer epoch
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 12, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 4L);
+        assertEquals(Optional.of(12), metadata.lastSeenLeaderEpoch(tp));
+
+        // We should also update if we see a new topicId even if the epoch is lower
+        Map<String, Uuid> newTopicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 3, newTopicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 5L);
+        assertEquals(Optional.of(3), metadata.lastSeenLeaderEpoch(tp));
+
+        // Finally, update when the topic ID is new and the epoch is higher
+        Map<String, Uuid> newTopicIds2 = Collections.singletonMap("topic-1", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 20, newTopicIds2);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 6L);
+        assertEquals(Optional.of(20), metadata.lastSeenLeaderEpoch(tp));
+

Review comment:
       nit: unneeded newline

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -375,17 +382,25 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
 
     /**
      * Compute the latest partition metadata to cache given ordering by leader epochs (if both
-     * available and reliable).
+     * available and reliable) and whether the topic ID changed.
      */
     private Optional<MetadataResponse.PartitionMetadata> updateLatestMetadata(
             MetadataResponse.PartitionMetadata partitionMetadata,
-            boolean hasReliableLeaderEpoch) {
+            boolean hasReliableLeaderEpoch,
+            Uuid topicId,
+            Uuid oldTopicId) {
         TopicPartition tp = partitionMetadata.topicPartition;
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
-            // If the received leader epoch is at least the same as the previous one, update the metadata
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (currentEpoch == null || newEpoch >= currentEpoch) {
+            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+                // If both topic IDs were valid and the topic ID changed, update the metadata
+                log.debug("Topic ID for partition {} changed from {} to {}, so this topic must have been recreated. " +

Review comment:
       Can we change this level to INFO?

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -77,6 +79,11 @@ private static MetadataResponse emptyMetadataResponse() {
                 Collections.emptyList());
     }
 
+    private <T> void assertEqualCollections(Collection<T> expected, Collection<T> actual) {

Review comment:
       nit: this name seems misleading since order could be important for arbitrary collections. Since we only have a couple uses, maybe we can get rid of it and use `assertEquals(new HashSet(A), new HashSet(B))` for example.

##########
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##########
@@ -158,13 +148,14 @@ MetadataCache mergeWith(String newClusterId,
         Map<TopicPartition, PartitionMetadata> newMetadataByPartition = new HashMap<>(addPartitions.size());
         Map<String, Uuid> newTopicIds = new HashMap<>(topicIds.size());
 
-        // We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse
-        // we add if a new topic ID is added or remove if the request did not support topic IDs for this topic.
-        for (Map.Entry<String, Uuid> entry : this.topicIds.entrySet()) {
-            if (shouldRetainTopic.test(entry.getKey())) {
-                newTopicIds.put(entry.getKey(), entry.getValue());
+        // We want the most recent topic ID. We start with the previous ID stored for retained topics and then
+        // update with newest information from the MetadataResponse. We always take the latest state, removing existing
+        // topic IDs if the latest state contains the topic name but not a topic ID.
+        this.topicIds.forEach((topicName, topicId) -> {

Review comment:
       nit: simpler or not?
   ```java
           Map<String, Uuid> newTopicIds = topicIds.entrySet().stream()
               .filter(entry -> shouldRetainTopic.test(entry.getKey()))
               .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1206,7 +1205,8 @@ private void validatePositionsOnMetadataChange() {
                     fetchable.put(node, builder);
                 }
 
-                builder.add(partition, topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new FetchRequest.PartitionData(position.offset,
+                Uuid topicId = metadata.topicId(partition.topic());

Review comment:
       It could be updated in a separate thread. I cannot see how that would be a problem though. We do have synchronization in `Metadata`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r749683412



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1206,7 +1205,8 @@ private void validatePositionsOnMetadataChange() {
                     fetchable.put(node, builder);
                 }
 
-                builder.add(partition, topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), new FetchRequest.PartitionData(position.offset,
+                Uuid topicId = metadata.topicId(partition.topic());

Review comment:
       It would have been a problem before KAFKA-13111 when we assumed only one topic ID per build for a given topic name (we had a mapping), but maybe it is ok now that we store the ID in the data and use it to build the request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r745899287



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -217,12 +217,12 @@ public synchronized boolean updateRequested() {
         }
     }
 
-    public synchronized Map<String, Uuid> topicIds() {
-        return cache.topicIds();
+    public synchronized Uuid topicId(String topicName) {
+        return cache.topicId(topicName);
     }
 
-    public synchronized Map<Uuid, String> topicNames() {
-        return cache.topicNames();
+    public synchronized  String topicName(Uuid topicId) {

Review comment:
       nit: extra space after `synchronized`

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -217,12 +217,12 @@ public synchronized boolean updateRequested() {
         }
     }
 
-    public synchronized Map<String, Uuid> topicIds() {
-        return cache.topicIds();
+    public synchronized Uuid topicId(String topicName) {

Review comment:
       In the 3.0 patch, I think we added javadoc comments: https://github.com/apache/kafka/pull/10952/files#diff-97c2911e6e1b97ed9b3c4e76531a321d8ea1fc6aa2c727c27b0a5e0ced893a2cR221.

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -389,6 +397,11 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
                 log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
+            } else if (changedTopicId) {

Review comment:
       I thought we changed the order of this in the 3.0 patch. We should be checking for a changed topic id before comparing epochs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11004:
URL: https://github.com/apache/kafka/pull/11004


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#issuecomment-965885163


   I pushed some of the changes that I missed from the 3.0 branch. We'll see how the build goes. Tests seemed to look ok for me locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r750833284



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -218,10 +218,10 @@ public synchronized boolean updateRequested() {
     }
 
     /**
-     * @return the topic ID for the given topic name or null if the ID does not exist or is not known
+     * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache
      */
-    public synchronized Uuid topicId(String topicName) {

Review comment:
       Hmmm...this is the version that could have a new cache value? Only thing I might worry about is misuse.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r750820000



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -218,10 +218,10 @@ public synchronized boolean updateRequested() {
     }
 
     /**
-     * @return the topic ID for the given topic name or null if the ID does not exist or is not known
+     * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache
      */
-    public synchronized Uuid topicId(String topicName) {

Review comment:
       Any harm keeping this one? Seems like it simplified some of the uses, especially in tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11004: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name (trunk)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11004:
URL: https://github.com/apache/kafka/pull/11004#discussion_r750733985



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -395,7 +395,7 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
             if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
                 // If both topic IDs were valid and the topic ID changed, update the metadata
-                log.debug("Topic ID for partition {} changed from {} to {}, so this topic must have been recreated. " +
+                log.info("Topic ID for partition {} changed from {} to {}, so this topic must have been recreated. " +

Review comment:
       This is very fussy, but for some reason, the phrasing here is bugging me. The addition of "must have" almost makes the event seem more uncertain and open to interpretation. Like we need to reassure the user that our deduction is correct. Maybe we can leave that part out?
   ```java
   log.info("Resetting the last seen epoch of partition {} to {} since the associated topicId changed from {} to {}"...
   ```
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org