You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/15 16:24:59 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769761183



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic A with a topic ID foo
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Topic A is now deleted so Response contains an Error. LeaderEpoch should still return maintain Old value
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), new HashMap<>());

Review comment:
       nit: Could we use `Collections.emptyMap()`?

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.

Review comment:
       nit: Could we start all comment with a capital letter to be consistent?

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic A with a topic ID foo
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Topic A is now deleted so Response contains an Error. LeaderEpoch should still return maintain Old value

Review comment:
       nit: Should `return` be removed?

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000)
+
+        (1 to numRecords).map { i =>
+            val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get
+            assertEquals(topic, resp.topic())
+        }
+
+        // start topic deletion
+        adminZkClient.deleteTopic(topic)
+
+        // Verify that the topic is deleted when no metadata request comes in
+        TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+        // Producer should be able to send messages even after topic gets deleted and auto-created
+        assertEquals(topic, producer.send(new ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get.topic())

Review comment:
       nit: The parenthesis around `value` could be removed.

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,8 +394,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
-                // If both topic IDs were valid and the topic ID changed, update the metadata
+            // Between the time that a topic is deleted and re-created, the client may lose
+            // track of the corresponding topicId (i.e. `oldTopicId` will be null). In this case,
+            // when we discover the new topicId, we allow the corresponding leader epoch
+            // to override the last seen value.

Review comment:
       I think that @hachikuji wanted to move this comment block into the `if` branch below, perhaps right after `If the new topic ID is valid and different from the last seen topic ID, update the metadata`.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic A with a topic ID foo

Review comment:
       nit: `foo` reads a bit weird here. I would just remove it. Similarly, `topic A` is weird because the topic is name `topic`.

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000)
+
+        (1 to numRecords).map { i =>

Review comment:
       nit: Using `foreach` instead of `map` would be better here.

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+

Review comment:
       nit: Empty line could be removed.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());

Review comment:
       nit: Would it make sense to move this line right after `Start with a topic A with a topic ID foo` as it goes together with the metadata created there?

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000)
+
+        (1 to numRecords).map { i =>
+            val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get
+            assertEquals(topic, resp.topic())
+        }
+
+        // start topic deletion
+        adminZkClient.deleteTopic(topic)
+
+        // Verify that the topic is deleted when no metadata request comes in
+        TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+        // Producer should be able to send messages even after topic gets deleted and auto-created
+        assertEquals(topic, producer.send(new ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get.topic())
+    }
+
+

Review comment:
       nit: One empty line could be removed here as well.

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {

Review comment:
       It might be better to extend `IntegrationTestHarness` instead of `BaseProducerSendTest` in this case. `BaseProducerSendTest` contains unit test that will be ran as well as the moment. I don't think that we have to run them with this configuration, or do we?

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+

Review comment:
       nit: One empty line could be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org