You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/10/20 01:59:27 UTC

[kafka] branch trunk updated: KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode (#14573)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 486d5f6c641 KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode (#14573)
486d5f6c641 is described below

commit 486d5f6c641885f8fb8babca795b55f5ca0c973e
Author: Gantigmaa Selenge <39...@users.noreply.github.com>
AuthorDate: Fri Oct 20 02:59:21 2023 +0100

    KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode (#14573)
    
    Fixed some of the failing tests in FetchRequestTest.
    
    testFetchWithPartitionsWithIdError and testCreateIncrementalFetchWithPartitionsInErrorV12 fail with the following error when enabled with KRaft mode. These tests only fail sometimes when running locally but consistently failed when running in the Jenkins Pipeline.
    
    Tests will call the utility function TestUtils.waitUntilLeaderIsKnown after creating the topic partitions so that they wait for the logs to be created on the leader before sending fetch requests.
    
    Enabled all tests except checkLastFetchedEpochValidation with KRaft mode.
    Looking at the build history in Jenkins, all the other tests except these 2 tests and checkLastFetchedEpochValidation were passing when they were enabled with KRaft mode. Therefore enabled them with KRaft mode again but left checkLastFetchedEpochValidation to be investigated further.
    
    Reviewers: Luke Chen <sh...@gmail.com>, dengziming <de...@gmail.com>
---
 .../scala/unit/kafka/server/FetchRequestTest.scala | 31 +++++++++++++---------
 1 file changed, 18 insertions(+), 13 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 6cce93f579d..a763eddf2b4 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -43,7 +43,7 @@ import scala.util.Random
 class FetchRequestTest extends BaseFetchRequestTest {
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testBrokerRespectsPartitionsOrderAndSizeLimits(quorum: String): Unit = {
     initProducer()
 
@@ -146,7 +146,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testFetchRequestV4WithReadCommitted(quorum: String): Unit = {
     initProducer()
     val maxPartitionBytes = 200
@@ -165,7 +165,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testFetchRequestToNonReplica(quorum: String): Unit = {
     val topic = "topic"
     val partition = 0
@@ -250,13 +250,13 @@ class FetchRequestTest extends BaseFetchRequestTest {
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testCurrentEpochValidation(quorum: String): Unit = {
     checkCurrentEpochValidation(ApiKeys.FETCH.latestVersion())
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testCurrentEpochValidationV12(quorum: String): Unit = {
     checkCurrentEpochValidation(12)
   }
@@ -300,13 +300,13 @@ class FetchRequestTest extends BaseFetchRequestTest {
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testEpochValidationWithinFetchSession(quorum: String): Unit = {
     checkEpochValidationWithinFetchSession(ApiKeys.FETCH.latestVersion())
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testEpochValidationWithinFetchSessionV12(quorum: String): Unit = {
     checkEpochValidationWithinFetchSession(12)
   }
@@ -368,7 +368,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
    * channel is muted in the server. If buffers are not released this will result in OOM.
    */
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testDownConversionWithConnectionFailure(quorum: String): Unit = {
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
     val topicIds = getTopicIds().asJava
@@ -436,7 +436,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     * some records have to be dropped during the conversion.
     */
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testDownConversionFromBatchedToUnbatchedRespectsOffset(quorum: String): Unit = {
     // Increase linger so that we have control over the batches created
     producer = TestUtils.createProducer(bootstrapServers(),
@@ -518,7 +518,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
    * This tests using FetchRequests that don't use topic IDs
    */
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testCreateIncrementalFetchWithPartitionsInErrorV12(quorum: String): Unit = {
     def createConsumerFetchRequest(topicPartitions: Seq[TopicPartition],
                            metadata: JFetchMetadata,
@@ -533,6 +533,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
     // topicNames can be empty because we are using old requests
     val topicNames = Map[Uuid, String]().asJava
     createTopicWithAssignment("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
+    TestUtils.waitUntilLeaderIsKnown(brokers, foo0)
+    TestUtils.waitUntilLeaderIsKnown(brokers, foo1)
     val bar0 = new TopicPartition("bar", 0)
     val req1 = createConsumerFetchRequest(List(foo0, foo1, bar0), JFetchMetadata.INITIAL, Nil)
     val resp1 = sendFetchRequest(0, req1)
@@ -557,6 +559,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertTrue(responseData2.containsKey(bar0))
     assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, responseData2.get(bar0).errorCode)
     createTopicWithAssignment("bar", Map(0 -> List(0, 1)))
+    TestUtils.waitUntilLeaderIsKnown(brokers, bar0)
     val req3 = createConsumerFetchRequest(Nil, new JFetchMetadata(resp1.sessionId(), 2), Nil)
     val resp3 = sendFetchRequest(0, req3)
     assertEquals(Errors.NONE, resp3.error())
@@ -578,7 +581,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
    * Test that when a Fetch Request receives an unknown topic ID, it returns a top level error.
    */
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testFetchWithPartitionsWithIdError(quorum: String): Unit = {
     def createConsumerFetchRequest(fetchData: util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData],
                            metadata: JFetchMetadata,
@@ -592,6 +595,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
     val foo0 = new TopicPartition("foo", 0)
     val foo1 = new TopicPartition("foo", 1)
     createTopicWithAssignment("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
+    TestUtils.waitUntilLeaderIsKnown(brokers, foo0)
+    TestUtils.waitUntilLeaderIsKnown(brokers, foo1)
     val topicIds = getTopicIds()
     val topicIdsWithUnknown = topicIds ++ Map("bar" -> Uuid.randomUuid())
     val bar0 = new TopicPartition("bar", 0)
@@ -621,7 +626,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testZStdCompressedTopic(quorum: String): Unit = {
     // ZSTD compressed topic
     val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> BrokerCompressionType.ZSTD.name)
@@ -669,7 +674,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk","kraft"))
   def testZStdCompressedRecords(quorum: String): Unit = {
     // Producer compressed topic
     val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> BrokerCompressionType.PRODUCER.name)