You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/12/16 15:17:21 UTC

[kafka] branch 2.8 updated: KAFKA-13488: Producer fails to recover if topic gets deleted midway (#11552)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new acc8b0c  KAFKA-13488: Producer fails to recover if topic gets deleted midway (#11552)
acc8b0c is described below

commit acc8b0c0b200a13ff619c02de54ed528ece7e885
Author: Prateek Agarwal <pr...@uber.com>
AuthorDate: Thu Dec 16 20:14:39 2021 +0530

    KAFKA-13488: Producer fails to recover if topic gets deleted midway (#11552)
    
    Allow the leader epoch to be re-assigned to the new value from the Metadata response if `oldTopicId` is not present in the cache. This is needed because `oldTopicId` is removed from the cache if the topic gets deleted but the leader epoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow `oldTopicId` to be null.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, David Jacot <dj...@confluent.io>
---
 .../java/org/apache/kafka/clients/Metadata.java    | 11 +--
 .../org/apache/kafka/clients/MetadataTest.java     | 34 +++++++--
 .../kafka/api/ProducerSendWhileDeletionTest.scala  | 83 ++++++++++++++++++++++
 3 files changed, 119 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index e4f1756..30ae642 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -393,10 +393,13 @@ public class Metadata implements Closeable {
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
-                // If both topic IDs were valid and the topic ID changed, update the metadata
-                log.debug("Topic ID for partition {} changed from {} to {}, so this topic must have been recreated. " +
-                          "Resetting the last seen epoch to {}.", tp, oldTopicId, topicId, newEpoch);
+            if (topicId != null && !topicId.equals(oldTopicId)) {
+                // If the new topic ID is valid and different from the last seen topic ID, update the metadata.
+                // Between the time that a topic is deleted and re-created, the client may lose track of the
+                // corresponding topicId (i.e. `oldTopicId` will be null). In this case, when we discover the new
+                // topicId, we allow the corresponding leader epoch to override the last seen value.
+                log.info("Resetting the last seen epoch of partition {} to {} since the associated topicId changed from {} to {}",
+                         tp, newEpoch, oldTopicId, topicId);
                 lastSeenLeaderEpochs.put(tp, newEpoch);
                 return Optional.of(partitionMetadata);
             } else if (currentEpoch == null || newEpoch >= currentEpoch) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5b560ad..a51e275 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -380,6 +380,31 @@ public class MetadataTest {
     }
 
     @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a Topic topic-1 with a random topic ID
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Topic topic-1 is now deleted so Response contains an Error. LeaderEpoch should still maintain Old value
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap());
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Create topic-1 again but this time with a different topic ID. LeaderEpoch should be updated to new even if lower.
+        Map<String, Uuid> newTopicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 5, newTopicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(5), metadata.lastSeenLeaderEpoch(tp));
+    }
+
+    @Test
     public void testEpochUpdateOnChangedTopicIds() {
         TopicPartition tp = new TopicPartition("topic-1", 0);
         Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
@@ -388,13 +413,12 @@ public class MetadataTest {
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
 
         // Start with a topic with no topic ID
-        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
-        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
 
-        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
-        // Don't update to an older one
-        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        // If the older topic ID is null, we should go with the new topic ID as the leader epoch
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
         assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
 
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
new file mode 100644
index 0000000..ec05bb2
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
@@ -0,0 +1,83 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
+    val producerCount: Int = 1
+    val brokerCount: Int = 2
+
+    serverConfig.put(KafkaConfig.NumPartitionsProp, 2.toString)
+    serverConfig.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+    serverConfig.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+
+    producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000L.toString)
+    producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000.toString)
+    producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000.toString)
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+        val topic = "topic"
+
+        // Create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer()
+
+        (1 to numRecords).foreach { i =>
+            val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get
+            assertEquals(topic, resp.topic())
+        }
+
+        // Start topic deletion
+        adminZkClient.deleteTopic(topic)
+
+        // Verify that the topic is deleted when no metadata request comes in
+        TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+        // Producer should be able to send messages even after topic gets deleted and auto-created
+        assertEquals(topic, producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get.topic())
+    }
+
+}