You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/09 20:36:26 UTC
[kafka] 06/07: MINOR: Convert some junit tests to kraft (#12443)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 112294334f4a2e539c37f8ea2a6064af7443351f
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Wed Jul 27 10:38:31 2022 -0700
MINOR: Convert some junit tests to kraft (#12443)
Convert ProducerCompressionTest, MirrorMakerIntegrationTest, EdgeCaseRequestTest to kraft.
Make it explicit that ServerShutdownTest#testControllerShutdownDuringSend is ZK-only.
Reviewers: David Arthur <mu...@gmail.com>
---
.../kafka/api/ProducerCompressionTest.scala | 49 +++++++++++-----------
.../kafka/tools/MirrorMakerIntegrationTest.scala | 24 ++++++-----
.../unit/kafka/server/EdgeCaseRequestTest.scala | 32 ++++++++------
.../unit/kafka/server/ServerShutdownTest.scala | 10 +++--
4 files changed, 64 insertions(+), 51 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index ccdfe7d3d36..07d9ccb024f 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -17,19 +17,19 @@
package kafka.api.test
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
-import kafka.server.QuorumTestHarness
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.CsvSource
import java.util.{Collections, Properties}
-import scala.jdk.CollectionConverters._
class ProducerCompressionTest extends QuorumTestHarness {
@@ -37,18 +37,18 @@ class ProducerCompressionTest extends QuorumTestHarness {
private val topic = "topic"
private val numRecords = 2000
- private var server: KafkaServer = null
+ private var broker: KafkaBroker = null
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
- server = TestUtils.createServer(KafkaConfig.fromProps(props))
+ val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull)
+ broker = createBroker(new KafkaConfig(props))
}
@AfterEach
override def tearDown(): Unit = {
- TestUtils.shutdownServers(Seq(server))
+ TestUtils.shutdownServers(Seq(broker))
super.tearDown()
}
@@ -58,11 +58,18 @@ class ProducerCompressionTest extends QuorumTestHarness {
* Compressed messages should be able to sent and consumed correctly
*/
@ParameterizedTest
- @MethodSource(Array("parameters"))
- def testCompression(compression: String): Unit = {
+ @CsvSource(value = Array(
+ "kraft,none",
+ "kraft,gzip",
+ "kraft,snappy",
+ "kraft,lz4",
+ "kraft,zstd",
+ "zk,gzip"
+ ))
+ def testCompression(quorum: String, compression: String): Unit = {
val producerProps = new Properties()
- val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(server))
+ val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker))
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
@@ -72,7 +79,13 @@ class ProducerCompressionTest extends QuorumTestHarness {
try {
// create topic
- TestUtils.createTopic(zkClient, topic, 1, 1, List(server))
+ val admin = TestUtils.createAdminClient(Seq(broker),
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ try {
+ TestUtils.createTopicWithAdmin(admin, topic, Seq(broker))
+ } finally {
+ admin.close()
+ }
val partition = 0
// prepare the messages
@@ -103,15 +116,3 @@ class ProducerCompressionTest extends QuorumTestHarness {
}
}
}
-
-object ProducerCompressionTest {
- def parameters: java.util.stream.Stream[Arguments] = {
- Seq(
- Arguments.of("none"),
- Arguments.of("gzip"),
- Arguments.of("snappy"),
- Arguments.of("lz4"),
- Arguments.of("zstd")
- ).asJava.stream()
- }
-}
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index 4f673cdd60a..c64d25fe4e6 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -18,26 +18,27 @@ package kafka.tools
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
-
import scala.collection.Seq
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer, NoRecordsException}
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.utils.Exit
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
override def generateConfigs: Seq[KafkaConfig] =
- TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties()))
+ TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps(_, new Properties()))
val exited = new AtomicBoolean(false)
@@ -57,8 +58,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
}
}
- @Test
- def testCommitOffsetsThrowTimeoutException(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCommitOffsetsThrowTimeoutException(quorum: String): Unit = {
val consumerProps = new Properties
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -70,8 +72,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
assertThrows(classOf[TimeoutException], () => mirrorMakerConsumer.commit())
}
- @Test
- def testCommitOffsetsRemoveNonExistentTopics(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCommitOffsetsRemoveNonExistentTopics(quorum: String): Unit = {
val consumerProps = new Properties
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -85,8 +88,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
assertTrue(mirrorMakerConsumer.offsets.isEmpty, "Offsets for non-existent topics should be removed")
}
- @Test
- def testCommaSeparatedRegex(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCommaSeparatedRegex(quorum: String): Unit = {
val topic = "new-topic"
val msg = "a test message"
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 1a383a8fbcd..1bbde3ffb6b 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -35,19 +35,20 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.ByteUtils
import org.apache.kafka.common.{TopicPartition, requests}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
class EdgeCaseRequestTest extends KafkaServerTestHarness {
def generateConfigs = {
- val props = TestUtils.createBrokerConfig(1, zkConnect)
+ val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
List(KafkaConfig.fromProps(props))
}
- private def socketServer = servers.head.socketServer
+ private def socketServer = brokers.head.socketServer
private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)))
@@ -116,8 +117,9 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
}
}
- @Test
- def testProduceRequestWithNullClientId(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testProduceRequestWithNullClientId(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
val correlationId = -1
@@ -161,23 +163,27 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode), "There should be no error")
}
- @Test
- def testHeaderOnlyRequest(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testHeaderOnlyRequest(quorum: String): Unit = {
verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, 1))
}
- @Test
- def testInvalidApiKeyRequest(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidApiKeyRequest(quorum: String): Unit = {
verifyDisconnect(requestHeaderBytes(-1, 0))
}
- @Test
- def testInvalidApiVersionRequest(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidApiVersionRequest(quorum: String): Unit = {
verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, -1))
}
- @Test
- def testMalformedHeaderRequest(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testMalformedHeaderRequest(quorum: String): Unit = {
val serializedBytes = {
// Only send apiKey and apiVersion
val buffer = ByteBuffer.allocate(
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 96aeac5fa61..70554d9427c 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.params.ParameterizedTest
@@ -251,9 +251,11 @@ class ServerShutdownTest extends KafkaServerTestHarness {
}
// Verify that if controller is in the midst of processing a request, shutdown completes
- // without waiting for request timeout.
- @Test
- def testControllerShutdownDuringSend(): Unit = {
+ // without waiting for request timeout. Since this involves LeaderAndIsr request, it is
+ // ZK-only for now.
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk"))
+ def testControllerShutdownDuringSend(quorum: String): Unit = {
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)