You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/07/27 17:38:51 UTC

[kafka] branch trunk updated: MINOR: Convert some junit tests to kraft (#12443)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1914418c5d MINOR: Convert some junit tests to kraft (#12443)
1914418c5d is described below

commit 1914418c5ded2c318687ebaa887d72788dfb7b76
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Wed Jul 27 10:38:31 2022 -0700

    MINOR: Convert some junit tests to kraft (#12443)
    
    Convert ProducerCompressionTest, MirrorMakerIntegrationTest, EdgeCaseRequestTest to kraft.
    
    Make it explicit that ServerShutdownTest#testControllerShutdownDuringSend is ZK-only.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../kafka/api/ProducerCompressionTest.scala        | 49 +++++++++++-----------
 .../kafka/tools/MirrorMakerIntegrationTest.scala   | 24 ++++++-----
 .../unit/kafka/server/EdgeCaseRequestTest.scala    | 32 ++++++++------
 .../unit/kafka/server/ServerShutdownTest.scala     | 10 +++--
 4 files changed, 64 insertions(+), 51 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index ccdfe7d3d3..07d9ccb024 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -17,19 +17,19 @@
 
 package kafka.api.test
 
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
 import kafka.utils.TestUtils
-import kafka.server.QuorumTestHarness
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.CsvSource
 
 import java.util.{Collections, Properties}
-import scala.jdk.CollectionConverters._
 
 class ProducerCompressionTest extends QuorumTestHarness {
 
@@ -37,18 +37,18 @@ class ProducerCompressionTest extends QuorumTestHarness {
   private val topic = "topic"
   private val numRecords = 2000
 
-  private var server: KafkaServer = null
+  private var broker: KafkaBroker = null
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
-    server = TestUtils.createServer(KafkaConfig.fromProps(props))
+    val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull)
+    broker = createBroker(new KafkaConfig(props))
   }
 
   @AfterEach
   override def tearDown(): Unit = {
-    TestUtils.shutdownServers(Seq(server))
+    TestUtils.shutdownServers(Seq(broker))
     super.tearDown()
   }
 
@@ -58,11 +58,18 @@ class ProducerCompressionTest extends QuorumTestHarness {
    * Compressed messages should be able to sent and consumed correctly
    */
   @ParameterizedTest
-  @MethodSource(Array("parameters"))
-  def testCompression(compression: String): Unit = {
+  @CsvSource(value = Array(
+    "kraft,none",
+    "kraft,gzip",
+    "kraft,snappy",
+    "kraft,lz4",
+    "kraft,zstd",
+    "zk,gzip"
+  ))
+  def testCompression(quorum: String, compression: String): Unit = {
 
     val producerProps = new Properties()
-    val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(server))
+    val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker))
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
     producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
     producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
@@ -72,7 +79,13 @@ class ProducerCompressionTest extends QuorumTestHarness {
 
     try {
       // create topic
-      TestUtils.createTopic(zkClient, topic, 1, 1, List(server))
+      val admin = TestUtils.createAdminClient(Seq(broker),
+        ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+      try {
+        TestUtils.createTopicWithAdmin(admin, topic, Seq(broker))
+      } finally {
+        admin.close()
+      }
       val partition = 0
 
       // prepare the messages
@@ -103,15 +116,3 @@ class ProducerCompressionTest extends QuorumTestHarness {
     }
   }
 }
-
-object ProducerCompressionTest {
-  def parameters: java.util.stream.Stream[Arguments] = {
-    Seq(
-      Arguments.of("none"),
-      Arguments.of("gzip"),
-      Arguments.of("snappy"),
-      Arguments.of("lz4"),
-      Arguments.of("zstd")
-    ).asJava.stream()
-  }
-}
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index 4f673cdd60..c64d25fe4e 100644
--- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -18,26 +18,27 @@ package kafka.tools
 
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
-
 import scala.collection.Seq
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.tools.MirrorMaker.{ConsumerWrapper, MirrorMakerProducer, NoRecordsException}
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
 import org.apache.kafka.common.utils.Exit
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 @deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
 class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
 
   override def generateConfigs: Seq[KafkaConfig] =
-    TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties()))
+    TestUtils.createBrokerConfigs(1, zkConnectOrNull).map(KafkaConfig.fromProps(_, new Properties()))
 
   val exited = new AtomicBoolean(false)
 
@@ -57,8 +58,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
     }
   }
 
-  @Test
-  def testCommitOffsetsThrowTimeoutException(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCommitOffsetsThrowTimeoutException(quorum: String): Unit = {
     val consumerProps = new Properties
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -70,8 +72,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
     assertThrows(classOf[TimeoutException], () => mirrorMakerConsumer.commit())
   }
 
-  @Test
-  def testCommitOffsetsRemoveNonExistentTopics(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCommitOffsetsRemoveNonExistentTopics(quorum: String): Unit = {
     val consumerProps = new Properties
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
     consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -85,8 +88,9 @@ class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
     assertTrue(mirrorMakerConsumer.offsets.isEmpty, "Offsets for non-existent topics should be removed")
   }
 
-  @Test
-  def testCommaSeparatedRegex(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCommaSeparatedRegex(quorum: String): Unit = {
     val topic = "new-topic"
     val msg = "a test message"
 
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 1a383a8fbc..1bbde3ffb6 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -35,19 +35,20 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.ByteUtils
 import org.apache.kafka.common.{TopicPartition, requests}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
 class EdgeCaseRequestTest extends KafkaServerTestHarness {
 
   def generateConfigs = {
-    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
     props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
     List(KafkaConfig.fromProps(props))
   }
 
-  private def socketServer = servers.head.socketServer
+  private def socketServer = brokers.head.socketServer
 
   private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
     new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)))
@@ -116,8 +117,9 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     }
   }
 
-  @Test
-  def testProduceRequestWithNullClientId(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testProduceRequestWithNullClientId(quorum: String): Unit = {
     val topic = "topic"
     val topicPartition = new TopicPartition(topic, 0)
     val correlationId = -1
@@ -161,23 +163,27 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode), "There should be no error")
   }
 
-  @Test
-  def testHeaderOnlyRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testHeaderOnlyRequest(quorum: String): Unit = {
     verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, 1))
   }
 
-  @Test
-  def testInvalidApiKeyRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidApiKeyRequest(quorum: String): Unit = {
     verifyDisconnect(requestHeaderBytes(-1, 0))
   }
 
-  @Test
-  def testInvalidApiVersionRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidApiVersionRequest(quorum: String): Unit = {
     verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, -1))
   }
 
-  @Test
-  def testMalformedHeaderRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMalformedHeaderRequest(quorum: String): Unit = {
     val serializedBytes = {
       // Only send apiKey and apiVersion
       val buffer = ByteBuffer.allocate(
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 96aeac5fa6..70554d9427 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.BrokerState
-import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.params.ParameterizedTest
@@ -251,9 +251,11 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   }
 
   // Verify that if controller is in the midst of processing a request, shutdown completes
-  // without waiting for request timeout.
-  @Test
-  def testControllerShutdownDuringSend(): Unit = {
+  // without waiting for request timeout. Since this involves LeaderAndIsr request, it is
+  // ZK-only for now.
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testControllerShutdownDuringSend(quorum: String): Unit = {
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val listenerName = ListenerName.forSecurityProtocol(securityProtocol)