You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/30 07:15:23 UTC

[GitHub] [kafka] prat0318 opened a new pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

prat0318 opened a new pull request #11552:
URL: https://github.com/apache/kafka/pull/11552


   Allow LeaderEpoch to be re-assigned to the new value from the Metadata Response if oldTopicId is not present in the cache. This is needed because oldTopicId is removed from the cache if the topic gets deleted but the LeaderEpoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow oldTopicId to be null.
   
   This is a fix on top of earlier made #10952 and #11004 PRs but still don't solve the bug mentioned in KAFKA-13488. This is now fixed in this PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769919454



##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000)
+
+        (1 to numRecords).map { i =>

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #11552:
URL: https://github.com/apache/kafka/pull/11552


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-992560873


   @hachikuji @jolshan Bump on the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769918971



##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {

Review comment:
       Agreed that extending `BaseProducerSendTest` would run unnecessary tests. Changed now to extend from `IntegrationTestHarness` and verified that the test fails w/o the fix and passes after the fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769902172



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic A with a topic ID foo
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Topic A is now deleted so Response contains an Error. LeaderEpoch should still return maintain Old value

Review comment:
       removed.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic A with a topic ID foo
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Topic A is now deleted so Response contains an Error. LeaderEpoch should still return maintain Old value
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), new HashMap<>());

Review comment:
       ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759796888



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.
+            if (topicId != null && !topicId.equals(oldTopicId)) {

Review comment:
       I think this approach is reasonable. The root of the problem is the loose binding between the leader epoch and the topicId. I'm still a bit tempted to take this a little further and only permit the epoch check when the topicIds are matching. This is a little difficult to do at the moment because the epoch may be learned in contexts where we have not yet exposed the topicId. Hopefully this can be tightened up in https://issues.apache.org/jira/browse/KAFKA-13447. For now, this workaround seems ok.

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.

Review comment:
       How about this?
   ```java
   // Between the time that a topic is deleted and re-created, the client may lose
   // track of the corresponding topicId (i.e. `oldTopicid` will be null). In this case, 
   // when we discover the new topicId, we allow the corresponding leader epoch
   // to override the last seen value.
   ```
   
   Also, would it make sense to move this into the corresponding branch that it applies to?

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -381,13 +381,12 @@ public void testEpochUpdateOnChangedTopicIds() {
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
 
         // Start with a topic with no topic ID
-        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
-        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
 
-        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
-        // Don't update to an older one
-        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        // If the Older topic Id is null, we should go with the new TopicId as the leader Epoch

Review comment:
       It might be worthwhile having a separate case which goes through the sequence described in the jira. Basically this:
   1. Receive metadata response with topicID A.
   2. Receive metadata response with UNKNOWN_TOPIC error.
   3. Receive metadata response with topicID B.
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.
+            if (topicId != null && !topicId.equals(oldTopicId)) {
                 // If both topic IDs were valid and the topic ID changed, update the metadata

Review comment:
       nit: this comment is now slightly inaccurate. 

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {

Review comment:
       Out of curiosity, why does this need a new class (as opposed to adding to `PlaintextProducerTest` for example)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759880730



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.

Review comment:
       Ack. Changed the comment as per the suggestion.
   
   > would it make sense to move this into the corresponding branch that it applies to?
   
   Sorry, couldn't get it. Can you elaborate on this please. (Do you mean a separate `if` branch? The current `If` branch deals with separate topicId, so that should be the one we should modify as part of this patch.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r770362464



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -381,13 +381,12 @@ public void testEpochUpdateOnChangedTopicIds() {
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
 
         // Start with a topic with no topic ID
-        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
-        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
 
-        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
-        // Don't update to an older one
-        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        // If the Older topic Id is null, we should go with the new TopicId as the leader Epoch

Review comment:
       nit: `Older` -> `older`; `topic Id` -> `topic ID`; `TopicId` -> `topic ID`; `Epoch` -> `epoch`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759881263



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -381,13 +381,12 @@ public void testEpochUpdateOnChangedTopicIds() {
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
 
         // Start with a topic with no topic ID
-        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
-        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
 
-        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
-        // Don't update to an older one
-        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        // If the Older topic Id is null, we should go with the new TopicId as the leader Epoch

Review comment:
       added a new testcase with the suggested flow. Old test-case still needs to be changed as that case fails now, so modified the case as per the new changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r770362464



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -381,13 +381,12 @@ public void testEpochUpdateOnChangedTopicIds() {
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
 
         // Start with a topic with no topic ID
-        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
         metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
-        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
 
-        // We should treat an added topic ID as though it is the same topic. Handle only when epoch increases.
-        // Don't update to an older one
-        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1, topicIds);
+        // If the Older topic Id is null, we should go with the new TopicId as the leader Epoch

Review comment:
       nit: `Older` -> `older`; `topic Id` -> `topic ID`; `TopicId` -> `topic ID`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-995594632


   @prat0318 Thanks for the update. I left a few more nits to fix typos. Would you have time to quickly address them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769903291



##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+

Review comment:
       removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759880730



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.

Review comment:
       Ack. Changed the comment as per the suggestion.
   
   > would it make sense to move this into the corresponding branch that it applies to?
   Sorry, couldn't get it. Can you elaborate on this please.

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.

Review comment:
       Ack. Changed the comment as per the suggestion.
   
   > would it make sense to move this into the corresponding branch that it applies to?
   
   Sorry, couldn't get it. Can you elaborate on this please.

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,7 +394,8 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
+            // oldTopicId can be null (when metadata is fetched during topic recreation), update the metadata in that case as well.
+            if (topicId != null && !topicId.equals(oldTopicId)) {
                 // If both topic IDs were valid and the topic ID changed, update the metadata

Review comment:
       updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r770355018



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,30 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());

Review comment:
       nit: We could have kept the comment before the block to be consistent with the other two blocks. I just wanted to rework it a bit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769917809



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());

Review comment:
       ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769901576



##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+

Review comment:
       removed.

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000)
+
+        (1 to numRecords).map { i =>
+            val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get
+            assertEquals(topic, resp.topic())
+        }
+
+        // start topic deletion
+        adminZkClient.deleteTopic(topic)
+
+        // Verify that the topic is deleted when no metadata request comes in
+        TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+        // Producer should be able to send messages even after topic gets deleted and auto-created
+        assertEquals(topic, producer.send(new ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get.topic())

Review comment:
       ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r759882210



##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {

Review comment:
       I earlier thought of putting it in `PlaintextProducerTest` but the new case needs 2 as RF for reassignments to take place (to increase leader epoch) and disabling `AutoLeaderRebalance`. Modifying the configs for the whole test class would affect the other tests, so decided to go for a new test class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769901214



##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000)
+
+        (1 to numRecords).map { i =>
+            val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get
+            assertEquals(topic, resp.topic())
+        }
+
+        // start topic deletion
+        adminZkClient.deleteTopic(topic)
+
+        // Verify that the topic is deleted when no metadata request comes in
+        TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+        // Producer should be able to send messages even after topic gets deleted and auto-created
+        assertEquals(topic, producer.send(new ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get.topic())
+    }
+
+

Review comment:
       removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769761183



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic A with a topic ID foo
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Topic A is now deleted so Response contains an Error. LeaderEpoch should still return maintain Old value
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), new HashMap<>());

Review comment:
       nit: Could we use `Collections.emptyMap()`?

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.

Review comment:
       nit: Could we start all comment with a capital letter to be consistent?

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic A with a topic ID foo
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Topic A is now deleted so Response contains an Error. LeaderEpoch should still return maintain Old value

Review comment:
       nit: Should `return` be removed?

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000)
+
+        (1 to numRecords).map { i =>
+            val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get
+            assertEquals(topic, resp.topic())
+        }
+
+        // start topic deletion
+        adminZkClient.deleteTopic(topic)
+
+        // Verify that the topic is deleted when no metadata request comes in
+        TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+        // Producer should be able to send messages even after topic gets deleted and auto-created
+        assertEquals(topic, producer.send(new ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get.topic())

Review comment:
       nit: The parenthesis around `value` could be removed.

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,8 +394,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
-                // If both topic IDs were valid and the topic ID changed, update the metadata
+            // Between the time that a topic is deleted and re-created, the client may lose
+            // track of the corresponding topicId (i.e. `oldTopicId` will be null). In this case,
+            // when we discover the new topicId, we allow the corresponding leader epoch
+            // to override the last seen value.

Review comment:
       I think that @hachikuji wanted to move this comment block into the `if` branch below, perhaps right after `If the new topic ID is valid and different from the last seen topic ID, update the metadata`.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic A with a topic ID foo

Review comment:
       nit: `foo` reads a bit weird here. I would just remove it. Similarly, `topic A` is weird because the topic is name `topic`.

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000)
+
+        (1 to numRecords).map { i =>

Review comment:
       nit: Using `foreach` instead of `map` would be better here.

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+

Review comment:
       nit: Empty line could be removed.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());

Review comment:
       nit: Would it make sense to move this line right after `Start with a topic A with a topic ID foo` as it goes together with the metadata created there?

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+    override def generateConfigs = {
+        val overridingProps = new Properties()
+        val numServers = 2
+        overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+        overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+        overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+        TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
+                trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+    }
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+
+        // create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1
+        zkClient.createPartitionReassignment(reassignment)
+        TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+            "failed to remove reassign partitions path after completion")
+
+        val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000)
+
+        (1 to numRecords).map { i =>
+            val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get
+            assertEquals(topic, resp.topic())
+        }
+
+        // start topic deletion
+        adminZkClient.deleteTopic(topic)
+
+        // Verify that the topic is deleted when no metadata request comes in
+        TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+        // Producer should be able to send messages even after topic gets deleted and auto-created
+        assertEquals(topic, producer.send(new ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get.topic())
+    }
+
+

Review comment:
       nit: One empty line could be removed here as well.

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {

Review comment:
       It might be better to extend `IntegrationTestHarness` instead of `BaseProducerSendTest` in this case. `BaseProducerSendTest` contains unit test that will be ran as well as the moment. I don't think that we have to run them with this configuration, or do we?

##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+

Review comment:
       nit: One empty line could be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r770355553



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,30 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+        metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, topicIds);
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Topic topic-1 is now deleted so Response contains an Error. LeaderEpoch should still maintain Old value
+        metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap());
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        // Create topic-1 again but this time with a topic ID bar. LeaderEpoch should be updated to new even if lower.

Review comment:
       nit: `Create topic-1 again but this time with a topic ID.`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-986920504


   @hachikuji Gentle bump on the review. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769917597



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -394,8 +394,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED)
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch.get();
             Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-            if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) {
-                // If both topic IDs were valid and the topic ID changed, update the metadata
+            // Between the time that a topic is deleted and re-created, the client may lose
+            // track of the corresponding topicId (i.e. `oldTopicId` will be null). In this case,
+            // when we discover the new topicId, we allow the corresponding leader epoch
+            // to override the last seen value.

Review comment:
       moved as per the suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769918057



##########
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##########
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
         assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> assertEquals(leaderAndEpoch.intValue(), 12));
     }
 
+    @Test
+    public void testEpochUpdateAfterTopicDeletion() {
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+        Map<String, Uuid> topicIds = Collections.singletonMap("topic-1", Uuid.randomUuid());
+
+        MetadataResponse metadataResponse = emptyMetadataResponse();
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+        // Start with a topic A with a topic ID foo

Review comment:
       agreed, modified accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-995910977


   Merged to trunk, 3.1, 3.0 and 2.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r770363133



##########
File path: core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##########
@@ -0,0 +1,83 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
+    val producerCount: Int = 1
+    val brokerCount: Int = 2
+
+    serverConfig.put(KafkaConfig.NumPartitionsProp, 2.toString)
+    serverConfig.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+    serverConfig.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+
+    producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000L.toString)
+    producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000.toString)
+    producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000.toString)
+
+    /**
+     * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
+     *
+     * Producer will attempt to send messages to the partition specified in each record, and should
+     * succeed as long as the partition is included in the metadata.
+     */
+    @Test
+    def testSendWithTopicDeletionMidWay(): Unit = {
+        val numRecords = 10
+        val topic = "topic"
+
+        // Create topic with leader as 0 for the 2 partitions.
+        createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+        val reassignment = Map(
+            new TopicPartition(topic, 0) -> Seq(1, 0),
+            new TopicPartition(topic, 1) -> Seq(1, 0)
+        )
+
+        // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1

Review comment:
       nit: `Epoch` -> `epoch`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-995596844


   > @prat0318 Thanks for the update. I left a few more nits to fix typos. Would you have time to quickly address them?
   
   @dajac Thanks again for the review. I have addressed the suggestions. Please re-review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] prat0318 commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

Posted by GitBox <gi...@apache.org>.
prat0318 commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-985139109


   @hachikuji Made the suggested changes. Please review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org