You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/01/06 18:01:57 UTC

[kafka] branch trunk updated: MINOR: enable KRaft mode in CreateTopicsRequestTest (#11629)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b135bb  MINOR: enable KRaft mode in CreateTopicsRequestTest (#11629)
6b135bb is described below

commit 6b135bb59b4b80e47468efc00ca909497e53fb79
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu Jan 6 09:59:31 2022 -0800

    MINOR: enable KRaft mode in CreateTopicsRequestTest (#11629)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, dengziming <de...@gmail.com>
---
 .../kafka/api/IntegrationTestHarness.scala         | 11 +++--
 .../server/AbstractCreateTopicsRequestTest.scala   | 37 ++++++++++++-----
 .../kafka/server/CreateTopicsRequestTest.scala     | 47 +++++++++++++++-------
 3 files changed, 66 insertions(+), 29 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index bc4166c..556f078 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -84,11 +84,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
   private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit = {
     if (isKRaftTest()) {
       props.foreach { config =>
-        // Add a security protocol for the CONTROLLER endpoint, if one is not already set.
+        // Add a security protocol for the controller endpoints, if one is not already set.
         val securityPairs = config.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "").split(",")
-        if (!securityPairs.exists(_.startsWith("CONTROLLER:"))) {
-          config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
-            (securityPairs ++ Seq(s"CONTROLLER:${controllerListenerSecurityProtocol.toString}")).mkString(","))
+        val toAdd = config.getProperty(KafkaConfig.ControllerListenerNamesProp, "").split(",").filter{
+          case e => !securityPairs.exists(_.startsWith(s"${e}:"))
+        }
+        if (toAdd.nonEmpty) {
+          config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, (securityPairs ++
+            toAdd.map(e => s"${e}:${controllerListenerSecurityProtocol.toString}")).mkString(","))
         }
       }
     }
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index b6036b7..661086c 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -91,8 +91,16 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
     topic
   }
 
+  def createTopicsSocketServer: SocketServer = {
+    if (isKRaftTest()) {
+      anySocketServer
+    } else {
+      controllerSocketServer
+    }
+  }
+
   protected def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = {
-    val response = sendCreateTopicRequest(request)
+    val response = sendCreateTopicRequest(request, createTopicsSocketServer)
 
     assertFalse(response.errorCounts().keySet().asScala.exists(_.code() > 0),
       s"There should be no errors, found ${response.errorCounts().keySet().asScala.mkString(", ")},")
@@ -100,7 +108,7 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
     request.data.topics.forEach { topic =>
       def verifyMetadata(socketServer: SocketServer) = {
         val metadata = sendMetadataRequest(
-          new MetadataRequest.Builder(List(topic.name()).asJava, false).build()).topicMetadata.asScala
+          new MetadataRequest.Builder(List(topic.name()).asJava, false).build(), socketServer).topicMetadata.asScala
         val metadataForTopic = metadata.filter(_.topic == topic.name()).head
 
         val partitions = if (!topic.assignments().isEmpty)
@@ -136,11 +144,13 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
         }
       }
 
-      // Verify controller broker has the correct metadata
-      verifyMetadata(controllerSocketServer)
+      if (!isKRaftTest()) {
+        // Verify controller broker has the correct metadata
+        verifyMetadata(controllerSocketServer)
+      }
       if (!request.data.validateOnly) {
         // Wait until metadata is propagated and validate non-controller broker has the correct metadata
-        TestUtils.waitForPartitionMetadata(servers, topic.name(), 0)
+        TestUtils.waitForPartitionMetadata(brokers, topic.name(), 0)
       }
       verifyMetadata(notControllerSocketServer)
     }
@@ -152,7 +162,7 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
   protected def validateErrorCreateTopicsRequests(request: CreateTopicsRequest,
                                                   expectedResponse: Map[String, ApiError],
                                                   checkErrorMessage: Boolean = true): Unit = {
-    val response = sendCreateTopicRequest(request)
+    val response = sendCreateTopicRequest(request, createTopicsSocketServer)
     assertEquals(expectedResponse.size, response.data().topics().size, "The response size should match")
 
     expectedResponse.foreach { case (topicName, expectedError) =>
@@ -173,19 +183,24 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
   }
 
   protected def validateTopicExists(topic: String): Unit = {
-    TestUtils.waitForPartitionMetadata(servers, topic, 0)
+    TestUtils.waitForPartitionMetadata(brokers, topic, 0)
     val metadata = sendMetadataRequest(
       new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala
     assertTrue(metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE), "The topic should be created")
   }
 
-  protected def sendCreateTopicRequest(request: CreateTopicsRequest,
-                                       socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = {
+  protected def sendCreateTopicRequest(
+    request: CreateTopicsRequest,
+    socketServer: SocketServer = controllerSocketServer
+  ): CreateTopicsResponse = {
     connectAndReceive[CreateTopicsResponse](request, socketServer)
   }
 
-  protected def sendMetadataRequest(request: MetadataRequest): MetadataResponse = {
-    connectAndReceive[MetadataResponse](request)
+  protected def sendMetadataRequest(
+    request: MetadataRequest,
+    socketServer: SocketServer = anySocketServer
+  ): MetadataResponse = {
+    connectAndReceive[MetadataResponse](request, socketServer)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 0f72ee2..94eb213 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -25,14 +25,15 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.CreateTopicsRequest
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
 class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
-
-  @Test
-  def testValidCreateTopicsRequests(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testValidCreateTopicsRequests(quorum: String): Unit = {
     // Generated assignments
     validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1"))))
     validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic2", replicationFactor = 3))))
@@ -60,8 +61,9 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
       topicReq("topic14", replicationFactor = -1, numPartitions = 2))))
   }
 
-  @Test
-  def testErrorCreateTopicsRequests(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testErrorCreateTopicsRequests(quorum: String): Unit = {
     val existingTopic = "existing-topic"
     createTopic(existingTopic, 1, 1)
     // Basic
@@ -98,9 +100,18 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
       ), checkErrorMessage = false
     )
     validateTopicExists("partial-none")
+  }
 
-    // Timeout
-    // We don't expect a request to ever complete within 1ms. A timeout of 1 ms allows us to test the purgatory timeout logic.
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testCreateTopicsWithVeryShortTimeouts(quorum: String): Unit = {
+    // When using ZooKeeper, we don't expect a request to ever complete within 1ms.
+    // A timeout of 1 ms allows us to test the purgatory timeout logic.
+    //
+    // Note: we do not test KRaft here because its behavior is different. Server-side
+    // timeouts are much less likely to happen with KRaft since the operation is much
+    // faster. Additionally, if a server side timeout does happen, the operation is
+    // usually not performed.
     validateErrorCreateTopicsRequests(topicsReq(Seq(
       topicReq("error-timeout", numPartitions = 10, replicationFactor = 3)), timeout = 1),
       Map("error-timeout" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
@@ -120,8 +131,10 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
     validateTopicExists("error-timeout-negative")
   }
 
-  @Test
-  def testInvalidCreateTopicsRequests(): Unit = {
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidCreateTopicsRequests(quorum: String): Unit = {
     // Partitions/ReplicationFactor and ReplicaAssignment
     validateErrorCreateTopicsRequests(topicsReq(Seq(
       topicReq("bad-args-topic", numPartitions = 10, replicationFactor = 3,
@@ -134,15 +147,21 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
       Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)), checkErrorMessage = false)
   }
 
-  @Test
-  def testNotController(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testNotController(quorum: String): Unit = {
+    // Note: we don't run this test when in KRaft mode, because KRaft doesn't have this
+    // behavior of returning NOT_CONTROLLER. Instead, the request is forwarded.
     val req = topicsReq(Seq(topicReq("topic1")))
     val response = sendCreateTopicRequest(req, notControllerSocketServer)
     assertEquals(1, response.errorCounts().get(Errors.NOT_CONTROLLER))
   }
 
-  @Test
-  def testCreateTopicsRequestVersions(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))
+  def testCreateTopicsRequestVersions(quorum: String): Unit = {
+    // Note: we don't run this test when in KRaft mode, because kraft does not yet support returning topic
+    // configs from CreateTopics.
     for (version <- ApiKeys.CREATE_TOPICS.oldestVersion to ApiKeys.CREATE_TOPICS.latestVersion) {
       val topic = s"topic_$version"
       val data = new CreateTopicsRequestData()