You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/12/16 14:53:48 UTC
[kafka] branch 3.1 updated: KAFKA-13488: Producer fails to recover if topic gets deleted midway (#11552)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 0a51be2 KAFKA-13488: Producer fails to recover if topic gets deleted midway (#11552)
0a51be2 is described below
commit 0a51be27ddd7f10ce8ce17add1edb4f6ead6454e
Author: Prateek Agarwal <pr...@uber.com>
AuthorDate: Thu Dec 16 20:14:39 2021 +0530
KAFKA-13488: Producer fails to recover if topic gets deleted midway (#11552)
Allow the leader epoch to be re-assigned to the new value from the Metadata response if `oldTopicId` is not present in the cache. This is needed because `oldTopicId` is removed from the cache if the topic gets deleted but the leader epoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow `oldTopicId` to be null.
Reviewers: Jason Gustafson <ja...@confluent.io>, David Jacot <dj...@confluent.io>
---
.../java/org/apache/kafka/clients/Metadata.java | 7 +-
.../org/apache/kafka/clients/MetadataTest.java | 34 +++++++--
.../kafka/api/ProducerSendWhileDeletionTest.scala | 83 ++++++++++++++++++++++
3 files changed, 117 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index d0df579..60d2c05 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -394,8 +394,11 @@ public class Metadata implements Closeable {
if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
int newEpoch = partitionMetadata.leaderEpoch.get();
Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
- if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
- // If both topic IDs were valid and the topic ID changed, update the metadata
+ if (topicId != null && !topicId.equals(oldTopicId)) {
+ // If the new topic ID is valid and different from the last seen topic ID, update the metadata.
+ // Between the time that a topic is deleted and re-created, the client may lose track of the
+ // corresponding topicId (i.e. `oldTopicId` will be null). In this case, when we discover the new
+ // topicId, we allow the corresponding leader epoch to override the last seen value.
log.info("Resetting the last seen epoch of partition {} to {} since the associated topicId changed from {} to {}",
tp, newEpoch, oldTopicId, topicId);
lastSeenLeaderEpochs.put(tp, newEpoch);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 1377b9e..a4383d8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -373,6 +373,31 @@ public class MetadataTest {
}
@Test
+ public void testEpochUpdateAfterTopicDeletion() {
+ TopicPartition tp = new TopicPartition("topic-1", 0);
+
+ MetadataResponse metadataResponse = emptyMetadataResponse();
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+ // Start with a Topic topic-1 with a random topic ID
+ Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+ metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+ assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+ // Topic topic-1 is now deleted so Response contains an Error. LeaderEpoch should still maintain Old value
+ metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap());
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+ assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+ // Create topic-1 again but this time with a different topic ID. LeaderEpoch should be updated to new even if lower.
+ Map<String, Uuid> newTopicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+ metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 5, newTopicIds);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+ assertEquals(Optional.of(5), metadata.lastSeenLeaderEpoch(tp));
+ }
+
+ @Test
public void testEpochUpdateOnChangedTopicIds() {
TopicPartition tp = new TopicPartition("topic-1", 0);
Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
@@ -381,13 +406,12 @@ public class MetadataTest {
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
// Start with a topic with no topic ID
- metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+ metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
- assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+ assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
- // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
- // Don't update to an older one
- metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+ // If the older topic ID is null, we should go with the new topic ID as the leader epoch
+ metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
new file mode 100644
index 0000000..ec05bb2
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
+ val producerCount: Int = 1
+ val brokerCount: Int = 2
+
+ serverConfig.put(KafkaConfig.NumPartitionsProp, 2.toString)
+ serverConfig.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+ serverConfig.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+
+ producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000L.toString)
+ producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000.toString)
+ producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000.toString)
+
+ /**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+ @Test
+ def testSendWithTopicDeletionMidWay(): Unit = {
+ val numRecords = 10
+ val topic = "topic"
+
+ // Create topic with leader as 0 for the 2 partitions.
+ createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+ val reassignment = Map(
+ new TopicPartition(topic, 0) -> Seq(1, 0),
+ new TopicPartition(topic, 1) -> Seq(1, 0)
+ )
+
+ // Change leader to 1 for both the partitions to increase leader epoch from 0 -> 1
+ zkClient.createPartitionReassignment(reassignment)
+ TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+ "failed to remove reassign partitions path after completion")
+
+ val producer = createProducer()
+
+ (1 to numRecords).foreach { i =>
+ val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get
+ assertEquals(topic, resp.topic())
+ }
+
+ // Start topic deletion
+ adminZkClient.deleteTopic(topic)
+
+ // Verify that the topic is deleted when no metadata request comes in
+ TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+ // Producer should be able to send messages even after topic gets deleted and auto-created
+ assertEquals(topic, producer.send(new ProducerRecord(topic, null, "value".getBytes(StandardCharsets.UTF_8))).get.topic())
+ }
+
+}