You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/05/14 00:29:53 UTC

[kafka] branch trunk updated: MINOR: convert some tests to KRaft (#12155)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a3e0af94f28 MINOR: convert some tests to KRaft (#12155)
a3e0af94f28 is described below

commit a3e0af94f2860a8d997017dcc2319396e62ff87f
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri May 13 17:29:47 2022 -0700

    MINOR: convert some tests to KRaft (#12155)
    
    Convert EndToEndClusterIdTest, ConsumerGroupCommandTest,
    ListConsumerGroupTest, and LogOffsetTest to test KRaft mode.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, dengziming <de...@gmail.com>
---
 .../kafka/api/EndToEndClusterIdTest.scala          | 11 +--
 .../kafka/admin/ConsumerGroupCommandTest.scala     |  2 +-
 .../unit/kafka/admin/ListConsumerGroupTest.scala   | 25 ++++---
 .../scala/unit/kafka/server/LogOffsetTest.scala    | 78 +++++++++++++---------
 4 files changed, 69 insertions(+), 47 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 2492903f535..25f7ce6a8c0 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -29,10 +29,12 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, TopicPartition}
 import org.apache.kafka.test.{TestUtils => _, _}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
 import scala.jdk.CollectionConverters._
 import org.apache.kafka.test.TestUtils.isValidClusterId
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 /** The test cases here verify the following conditions.
   * 1. The ProducerInterceptor receives the cluster id after the onSend() method is called and before onAcknowledgement() method is called.
@@ -99,7 +101,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, classOf[MockBrokerMetricsReporter].getName)
 
   override def generateConfigs = {
-    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
     cfgs.foreach(_ ++= serverConfig)
     cfgs.map(KafkaConfig.fromProps)
@@ -113,8 +115,9 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
     createTopic(topic, 2, serverCount)
   }
 
-  @Test
-  def testEndToEnd(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testEndToEnd(quorum: String): Unit = {
     val appendStr = "mock"
     MockConsumerInterceptor.resetCounters()
     MockProducerInterceptor.resetCounters()
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 571f2dbf4d7..6851ba2d476 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -46,7 +46,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
 
   // configure the servers and clients
   override def generateConfigs = {
-    TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
+    TestUtils.createBrokerConfigs(1, zkConnectOrNull, enableControlledShutdown = false).map { props =>
       KafkaConfig.fromProps(props)
     }
   }
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 11fd0a3b1f2..4e7575e797c 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -18,16 +18,19 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
 import kafka.utils.TestUtils
 import org.apache.kafka.common.ConsumerGroupState
 import org.apache.kafka.clients.admin.ConsumerGroupListing
 import java.util.Optional
 
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @Test
-  def testListConsumerGroups(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListConsumerGroups(quorum: String): Unit = {
     val simpleGroup = "simple-group"
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
@@ -43,13 +46,15 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
     }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
   def testListWithUnrecognizedNewConsumerOption(): Unit = {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--list")
     assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs))
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
   def testListConsumerGroupsWithStates(): Unit = {
     val simpleGroup = "simple-group"
     addSimpleGroupExecutor(group = simpleGroup)
@@ -78,8 +83,9 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
     }, s"Expected to show groups $expectedListingStable, but found $foundListing")
   }
 
-  @Test
-  def testConsumerGroupStatesFromString(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConsumerGroupStatesFromString(quorum: String): Unit = {
     var result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable")
     assertEquals(Set(ConsumerGroupState.STABLE), result)
 
@@ -98,8 +104,9 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
     assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"))
   }
 
-  @Test
-  def testListGroupCommand(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListGroupCommand(quorum: String): Unit = {
     val simpleGroup = "simple-group"
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 51f75b2ded0..a9d765c5cd8 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -25,14 +25,15 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse}
 import org.apache.kafka.common.{IsolationLevel, TopicPartition}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.Mockito.{mock, when}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
-
 import java.io.File
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Optional, Properties, Random}
+
 import scala.collection.mutable.Buffer
 import scala.jdk.CollectionConverters._
 
@@ -53,8 +54,9 @@ class LogOffsetTest extends BaseRequestTest {
   }
 
   @deprecated("ListOffsetsRequest V0", since = "")
-  @Test
-  def testGetOffsetsForUnknownTopic(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
     val topicPartition = new TopicPartition("foo", 0)
     val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
       .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
@@ -63,8 +65,9 @@ class LogOffsetTest extends BaseRequestTest {
   }
 
   @deprecated("ListOffsetsRequest V0", since = "")
-  @Test
-  def testGetOffsetsAfterDeleteRecords(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testGetOffsetsAfterDeleteRecords(quorum: String): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
     val log = createTopicAndGetLog(topic, topicPartition)
@@ -80,7 +83,7 @@ class LogOffsetTest extends BaseRequestTest {
     val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15)
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), offsets)
 
-    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, broker),
       "Leader should be elected")
     val request = ListOffsetsRequest.Builder.forReplica(0, 0)
       .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 15).asJava).build()
@@ -88,8 +91,9 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
   }
 
-  @Test
-  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(quorum: String): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
     val log = createTopicAndGetLog(topic, topicPartition)
@@ -111,8 +115,9 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(-1L, secondOffset.get.timestamp)
   }
 
-  @Test
-  def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(quorum: String): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
     val log = createTopicAndGetLog(topic, topicPartition)
@@ -129,13 +134,14 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(6L, maxTimestampOffset.get.timestamp)
   }
 
-  @Test
-  def testGetOffsetsBeforeLatestTime(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testGetOffsetsBeforeLatestTime(quorum: String): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
     val log = createTopicAndGetLog(topic, topicPartition)
 
-    val topicIds = getTopicIds().asJava
+    val topicIds = getTopicIds(Seq("kafka-")).asJava
     val topicNames = topicIds.asScala.map(_.swap).asJava
     val topicId = topicIds.get(topic)
 
@@ -146,7 +152,7 @@ class LogOffsetTest extends BaseRequestTest {
     val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15)
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
 
-    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, 0, server),
+    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, 0, broker),
       "Leader should be elected")
     val request = ListOffsetsRequest.Builder.forReplica(0, 0)
       .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 15).asJava).build()
@@ -161,8 +167,9 @@ class LogOffsetTest extends BaseRequestTest {
     assertFalse(FetchResponse.recordsOrFail(fetchResponse.responseData(topicNames, ApiKeys.FETCH.latestVersion).get(topicPartition)).batches.iterator.hasNext)
   }
 
-  @Test
-  def testEmptyLogsGetOffsets(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testEmptyLogsGetOffsets(quorum: String): Unit = {
     val random = new Random
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, random.nextInt(10))
@@ -184,8 +191,9 @@ class LogOffsetTest extends BaseRequestTest {
     assertFalse(offsetChanged)
   }
 
-  @Test
-  def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(quorum: String): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
     val log = createTopicAndGetLog(topic, topicPartition)
@@ -199,15 +207,16 @@ class LogOffsetTest extends BaseRequestTest {
   }
 
   @deprecated("legacyFetchOffsetsBefore", since = "")
-  @Test
-  def testGetOffsetsBeforeNow(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testGetOffsetsBeforeNow(quorum: String): Unit = {
     val random = new Random
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, random.nextInt(3))
 
     createTopic(topic, 3, 1)
 
-    val logManager = server.getLogManager
+    val logManager = broker.logManager
     val log = logManager.getOrCreateLog(topicPartition, topicId = None)
 
     for (_ <- 0 until 20)
@@ -219,7 +228,7 @@ class LogOffsetTest extends BaseRequestTest {
     val offsets = log.legacyFetchOffsetsBefore(now, 15)
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
 
-    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, broker),
       "Leader should be elected")
     val request = ListOffsetsRequest.Builder.forReplica(0, 0)
       .setTargetTimes(buildTargetTimes(topicPartition, now, 15).asJava).build()
@@ -228,15 +237,16 @@ class LogOffsetTest extends BaseRequestTest {
   }
 
   @deprecated("legacyFetchOffsetsBefore", since = "")
-  @Test
-  def testGetOffsetsBeforeEarliestTime(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testGetOffsetsBeforeEarliestTime(quorum: String): Unit = {
     val random = new Random
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, random.nextInt(3))
 
     createTopic(topic, 3, 1)
 
-    val logManager = server.getLogManager
+    val logManager = broker.logManager
     val log = logManager.getOrCreateLog(topicPartition, topicId = None)
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -246,7 +256,7 @@ class LogOffsetTest extends BaseRequestTest {
 
     assertEquals(Seq(0L), offsets)
 
-    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, broker),
       "Leader should be elected")
     val request = ListOffsetsRequest.Builder.forReplica(0, 0)
       .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.EARLIEST_TIMESTAMP, 10).asJava).build()
@@ -256,8 +266,9 @@ class LogOffsetTest extends BaseRequestTest {
 
   /* We test that `fetchOffsetsBefore` works correctly if `LogSegment.size` changes after each invocation (simulating
    * a race condition) */
-  @Test
-  def testFetchOffsetsBeforeWithChangingSegmentSize(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFetchOffsetsBeforeWithChangingSegmentSize(quorum: String): Unit = {
     val log: UnifiedLog = mock(classOf[UnifiedLog])
     val logSegment: LogSegment = mock(classOf[LogSegment])
     when(logSegment.size).thenAnswer(new Answer[Int] {
@@ -271,8 +282,9 @@ class LogOffsetTest extends BaseRequestTest {
 
   /* We test that `fetchOffsetsBefore` works correctly if `Log.logSegments` content and size are
    * different (simulating a race condition) */
-  @Test
-  def testFetchOffsetsBeforeWithChangingSegments(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFetchOffsetsBeforeWithChangingSegments(quorum: String): Unit = {
     val log: UnifiedLog = mock(classOf[UnifiedLog])
     val logSegment: LogSegment = mock(classOf[LogSegment])
     when(log.logSegments).thenReturn(
@@ -284,7 +296,7 @@ class LogOffsetTest extends BaseRequestTest {
     log.legacyFetchOffsetsBefore(System.currentTimeMillis, 100)
   }
 
-  private def server: KafkaServer = servers.head
+  private def broker: KafkaBroker = brokers.head
 
   private def sendListOffsetsRequest(request: ListOffsetsRequest): ListOffsetsResponse = {
     connectAndReceive[ListOffsetsResponse](request)
@@ -312,7 +324,7 @@ class LogOffsetTest extends BaseRequestTest {
   private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): UnifiedLog = {
     createTopic(topic, 1, 1)
 
-    val logManager = server.getLogManager
+    val logManager = broker.logManager
     TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
       "Log for partition [topic,0] should be created")
     logManager.getLog(topicPartition).get