You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/01 02:13:10 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

hachikuji commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759796888



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.
+            if (topicId != null && !topicId.equals(oldTopicId)) {

Review comment:
       I think this approach is reasonable. The root of the problem is the loose binding between the leader epoch and the topicId. I'm still a bit tempted to take this a little further and only permit the epoch check when the topicIds are matching. This is a little difficult to do at the moment because the epoch may be learned in contexts where we have not yet exposed the topicId. Hopefully this can be tightened up in https://issues.apache.org/jira/browse/KAFKA-13447. For now, this workaround seems ok.

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.

Review comment:
       How about this?
   ```java
   // Between the time that a topic is deleted and re-created, the client may lose
   // track of the corresponding topicId (i.e. `oldTopicid` will be null). In this case, 
   // when we discover the new topicId, we allow the corresponding leader epoch
   // to override the last seen value.
   ```
   
   Also, would it make sense to move this into the corresponding branch that it applies to?

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -381,13 +381,12 @@ public void testEpochUpdateOnChangedTopicIds() {
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
 
         // Start with a topic with no topic ID
-        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
-        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
 
-        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
-        // Don't update to an older one
-        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        // If the Older topic Id is null, we should go with the new TopicId as the leader Epoch

Review comment:
       It might be worthwhile having a separate case which goes through the sequence described in the jira. Basically this:
   1. Receive metadata response with topicID A.
   2. Receive metadata response with UNKNOWN_TOPIC error.
   3. Receive metadata response with topicID B.
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.
+            if (topicId != null && !topicId.equals(oldTopicId)) {
                 // If both topic IDs were valid and the topic ID changed, update the metadata

Review comment:
       nit: this comment is now slightly inaccurate. 

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {

Review comment:
       Out of curiosity, why does this need a new class (as opposed to adding to `PlaintextProducerTest` for example)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org