You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/05/14 00:29:53 UTC
[kafka] branch trunk updated: MINOR: convert some tests to KRaft (#12155)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a3e0af94f28 MINOR: convert some tests to KRaft (#12155)
a3e0af94f28 is described below
commit a3e0af94f2860a8d997017dcc2319396e62ff87f
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri May 13 17:29:47 2022 -0700
MINOR: convert some tests to KRaft (#12155)
Convert EndToEndClusterIdTest, ConsumerGroupCommandTest,
ListConsumerGroupTest, and LogOffsetTest to test KRaft mode.
Reviewers: Jason Gustafson <ja...@confluent.io>, dengziming <de...@gmail.com>
---
.../kafka/api/EndToEndClusterIdTest.scala | 11 +--
.../kafka/admin/ConsumerGroupCommandTest.scala | 2 +-
.../unit/kafka/admin/ListConsumerGroupTest.scala | 25 ++++---
.../scala/unit/kafka/server/LogOffsetTest.scala | 78 +++++++++++++---------
4 files changed, 69 insertions(+), 47 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 2492903f535..25f7ce6a8c0 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -29,10 +29,12 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, TopicPartition}
import org.apache.kafka.test.{TestUtils => _, _}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
import scala.jdk.CollectionConverters._
import org.apache.kafka.test.TestUtils.isValidClusterId
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
/** The test cases here verify the following conditions.
* 1. The ProducerInterceptor receives the cluster id after the onSend() method is called and before onAcknowledgement() method is called.
@@ -99,7 +101,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, classOf[MockBrokerMetricsReporter].getName)
override def generateConfigs = {
- val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+ val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)
@@ -113,8 +115,9 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
createTopic(topic, 2, serverCount)
}
- @Test
- def testEndToEnd(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testEndToEnd(quorum: String): Unit = {
val appendStr = "mock"
MockConsumerInterceptor.resetCounters()
MockProducerInterceptor.resetCounters()
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 571f2dbf4d7..6851ba2d476 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -46,7 +46,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
// configure the servers and clients
override def generateConfigs = {
- TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props =>
+ TestUtils.createBrokerConfigs(1, zkConnectOrNull, enableControlledShutdown = false).map { props =>
KafkaConfig.fromProps(props)
}
}
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 11fd0a3b1f2..4e7575e797c 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -18,16 +18,19 @@ package kafka.admin
import joptsimple.OptionException
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
import kafka.utils.TestUtils
import org.apache.kafka.common.ConsumerGroupState
import org.apache.kafka.clients.admin.ConsumerGroupListing
import java.util.Optional
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
class ListConsumerGroupTest extends ConsumerGroupCommandTest {
- @Test
- def testListConsumerGroups(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListConsumerGroups(quorum: String): Unit = {
val simpleGroup = "simple-group"
addSimpleGroupExecutor(group = simpleGroup)
addConsumerGroupExecutor(numConsumers = 1)
@@ -43,13 +46,15 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
}, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
}
- @Test
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
def testListWithUnrecognizedNewConsumerOption(): Unit = {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", bootstrapServers(), "--list")
assertThrows(classOf[OptionException], () => getConsumerGroupService(cgcArgs))
}
- @Test
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
def testListConsumerGroupsWithStates(): Unit = {
val simpleGroup = "simple-group"
addSimpleGroupExecutor(group = simpleGroup)
@@ -78,8 +83,9 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
}, s"Expected to show groups $expectedListingStable, but found $foundListing")
}
- @Test
- def testConsumerGroupStatesFromString(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConsumerGroupStatesFromString(quorum: String): Unit = {
var result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable")
assertEquals(Set(ConsumerGroupState.STABLE), result)
@@ -98,8 +104,9 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString(" , ,"))
}
- @Test
- def testListGroupCommand(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListGroupCommand(quorum: String): Unit = {
val simpleGroup = "simple-group"
addSimpleGroupExecutor(group = simpleGroup)
addConsumerGroupExecutor(numConsumers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 51f75b2ded0..a9d765c5cd8 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -25,14 +25,15 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.Mockito.{mock, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
-
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Optional, Properties, Random}
+
import scala.collection.mutable.Buffer
import scala.jdk.CollectionConverters._
@@ -53,8 +54,9 @@ class LogOffsetTest extends BaseRequestTest {
}
@deprecated("ListOffsetsRequest V0", since = "")
- @Test
- def testGetOffsetsForUnknownTopic(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
val topicPartition = new TopicPartition("foo", 0)
val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
@@ -63,8 +65,9 @@ class LogOffsetTest extends BaseRequestTest {
}
@deprecated("ListOffsetsRequest V0", since = "")
- @Test
- def testGetOffsetsAfterDeleteRecords(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testGetOffsetsAfterDeleteRecords(quorum: String): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
@@ -80,7 +83,7 @@ class LogOffsetTest extends BaseRequestTest {
val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15)
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), offsets)
- TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+ TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, broker),
"Leader should be elected")
val request = ListOffsetsRequest.Builder.forReplica(0, 0)
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 15).asJava).build()
@@ -88,8 +91,9 @@ class LogOffsetTest extends BaseRequestTest {
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
}
- @Test
- def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(quorum: String): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
@@ -111,8 +115,9 @@ class LogOffsetTest extends BaseRequestTest {
assertEquals(-1L, secondOffset.get.timestamp)
}
- @Test
- def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(quorum: String): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
@@ -129,13 +134,14 @@ class LogOffsetTest extends BaseRequestTest {
assertEquals(6L, maxTimestampOffset.get.timestamp)
}
- @Test
- def testGetOffsetsBeforeLatestTime(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testGetOffsetsBeforeLatestTime(quorum: String): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
- val topicIds = getTopicIds().asJava
+ val topicIds = getTopicIds(Seq("kafka-")).asJava
val topicNames = topicIds.asScala.map(_.swap).asJava
val topicId = topicIds.get(topic)
@@ -146,7 +152,7 @@ class LogOffsetTest extends BaseRequestTest {
val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15)
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
- TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, 0, server),
+ TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, 0, broker),
"Leader should be elected")
val request = ListOffsetsRequest.Builder.forReplica(0, 0)
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 15).asJava).build()
@@ -161,8 +167,9 @@ class LogOffsetTest extends BaseRequestTest {
assertFalse(FetchResponse.recordsOrFail(fetchResponse.responseData(topicNames, ApiKeys.FETCH.latestVersion).get(topicPartition)).batches.iterator.hasNext)
}
- @Test
- def testEmptyLogsGetOffsets(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testEmptyLogsGetOffsets(quorum: String): Unit = {
val random = new Random
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, random.nextInt(10))
@@ -184,8 +191,9 @@ class LogOffsetTest extends BaseRequestTest {
assertFalse(offsetChanged)
}
- @Test
- def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(quorum: String): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
val log = createTopicAndGetLog(topic, topicPartition)
@@ -199,15 +207,16 @@ class LogOffsetTest extends BaseRequestTest {
}
@deprecated("legacyFetchOffsetsBefore", since = "")
- @Test
- def testGetOffsetsBeforeNow(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testGetOffsetsBeforeNow(quorum: String): Unit = {
val random = new Random
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, random.nextInt(3))
createTopic(topic, 3, 1)
- val logManager = server.getLogManager
+ val logManager = broker.logManager
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
for (_ <- 0 until 20)
@@ -219,7 +228,7 @@ class LogOffsetTest extends BaseRequestTest {
val offsets = log.legacyFetchOffsetsBefore(now, 15)
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets)
- TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+ TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, broker),
"Leader should be elected")
val request = ListOffsetsRequest.Builder.forReplica(0, 0)
.setTargetTimes(buildTargetTimes(topicPartition, now, 15).asJava).build()
@@ -228,15 +237,16 @@ class LogOffsetTest extends BaseRequestTest {
}
@deprecated("legacyFetchOffsetsBefore", since = "")
- @Test
- def testGetOffsetsBeforeEarliestTime(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testGetOffsetsBeforeEarliestTime(quorum: String): Unit = {
val random = new Random
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, random.nextInt(3))
createTopic(topic, 3, 1)
- val logManager = server.getLogManager
+ val logManager = broker.logManager
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -246,7 +256,7 @@ class LogOffsetTest extends BaseRequestTest {
assertEquals(Seq(0L), offsets)
- TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, server),
+ TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, broker),
"Leader should be elected")
val request = ListOffsetsRequest.Builder.forReplica(0, 0)
.setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.EARLIEST_TIMESTAMP, 10).asJava).build()
@@ -256,8 +266,9 @@ class LogOffsetTest extends BaseRequestTest {
/* We test that `fetchOffsetsBefore` works correctly if `LogSegment.size` changes after each invocation (simulating
* a race condition) */
- @Test
- def testFetchOffsetsBeforeWithChangingSegmentSize(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFetchOffsetsBeforeWithChangingSegmentSize(quorum: String): Unit = {
val log: UnifiedLog = mock(classOf[UnifiedLog])
val logSegment: LogSegment = mock(classOf[LogSegment])
when(logSegment.size).thenAnswer(new Answer[Int] {
@@ -271,8 +282,9 @@ class LogOffsetTest extends BaseRequestTest {
/* We test that `fetchOffsetsBefore` works correctly if `Log.logSegments` content and size are
* different (simulating a race condition) */
- @Test
- def testFetchOffsetsBeforeWithChangingSegments(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFetchOffsetsBeforeWithChangingSegments(quorum: String): Unit = {
val log: UnifiedLog = mock(classOf[UnifiedLog])
val logSegment: LogSegment = mock(classOf[LogSegment])
when(log.logSegments).thenReturn(
@@ -284,7 +296,7 @@ class LogOffsetTest extends BaseRequestTest {
log.legacyFetchOffsetsBefore(System.currentTimeMillis, 100)
}
- private def server: KafkaServer = servers.head
+ private def broker: KafkaBroker = brokers.head
private def sendListOffsetsRequest(request: ListOffsetsRequest): ListOffsetsResponse = {
connectAndReceive[ListOffsetsResponse](request)
@@ -312,7 +324,7 @@ class LogOffsetTest extends BaseRequestTest {
private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): UnifiedLog = {
createTopic(topic, 1, 1)
- val logManager = server.getLogManager
+ val logManager = broker.logManager
TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
"Log for partition [topic,0] should be created")
logManager.getLog(topicPartition).get