You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/01/06 18:01:57 UTC
[kafka] branch trunk updated: MINOR: enable KRaft mode in CreateTopicsRequestTest (#11629)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b135bb MINOR: enable KRaft mode in CreateTopicsRequestTest (#11629)
6b135bb is described below
commit 6b135bb59b4b80e47468efc00ca909497e53fb79
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu Jan 6 09:59:31 2022 -0800
MINOR: enable KRaft mode in CreateTopicsRequestTest (#11629)
Reviewers: Jason Gustafson <ja...@confluent.io>, dengziming <de...@gmail.com>
---
.../kafka/api/IntegrationTestHarness.scala | 11 +++--
.../server/AbstractCreateTopicsRequestTest.scala | 37 ++++++++++++-----
.../kafka/server/CreateTopicsRequestTest.scala | 47 +++++++++++++++-------
3 files changed, 66 insertions(+), 29 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index bc4166c..556f078 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -84,11 +84,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit = {
if (isKRaftTest()) {
props.foreach { config =>
- // Add a security protocol for the CONTROLLER endpoint, if one is not already set.
+ // Add a security protocol for the controller endpoints, if one is not already set.
val securityPairs = config.getProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "").split(",")
- if (!securityPairs.exists(_.startsWith("CONTROLLER:"))) {
- config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
- (securityPairs ++ Seq(s"CONTROLLER:${controllerListenerSecurityProtocol.toString}")).mkString(","))
+ val toAdd = config.getProperty(KafkaConfig.ControllerListenerNamesProp, "").split(",").filter{
+ case e => !securityPairs.exists(_.startsWith(s"${e}:"))
+ }
+ if (toAdd.nonEmpty) {
+ config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, (securityPairs ++
+ toAdd.map(e => s"${e}:${controllerListenerSecurityProtocol.toString}")).mkString(","))
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index b6036b7..661086c 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -91,8 +91,16 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
topic
}
+ def createTopicsSocketServer: SocketServer = {
+ if (isKRaftTest()) {
+ anySocketServer
+ } else {
+ controllerSocketServer
+ }
+ }
+
protected def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = {
- val response = sendCreateTopicRequest(request)
+ val response = sendCreateTopicRequest(request, createTopicsSocketServer)
assertFalse(response.errorCounts().keySet().asScala.exists(_.code() > 0),
s"There should be no errors, found ${response.errorCounts().keySet().asScala.mkString(", ")},")
@@ -100,7 +108,7 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
request.data.topics.forEach { topic =>
def verifyMetadata(socketServer: SocketServer) = {
val metadata = sendMetadataRequest(
- new MetadataRequest.Builder(List(topic.name()).asJava, false).build()).topicMetadata.asScala
+ new MetadataRequest.Builder(List(topic.name()).asJava, false).build(), socketServer).topicMetadata.asScala
val metadataForTopic = metadata.filter(_.topic == topic.name()).head
val partitions = if (!topic.assignments().isEmpty)
@@ -136,11 +144,13 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
}
}
- // Verify controller broker has the correct metadata
- verifyMetadata(controllerSocketServer)
+ if (!isKRaftTest()) {
+ // Verify controller broker has the correct metadata
+ verifyMetadata(controllerSocketServer)
+ }
if (!request.data.validateOnly) {
// Wait until metadata is propagated and validate non-controller broker has the correct metadata
- TestUtils.waitForPartitionMetadata(servers, topic.name(), 0)
+ TestUtils.waitForPartitionMetadata(brokers, topic.name(), 0)
}
verifyMetadata(notControllerSocketServer)
}
@@ -152,7 +162,7 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
protected def validateErrorCreateTopicsRequests(request: CreateTopicsRequest,
expectedResponse: Map[String, ApiError],
checkErrorMessage: Boolean = true): Unit = {
- val response = sendCreateTopicRequest(request)
+ val response = sendCreateTopicRequest(request, createTopicsSocketServer)
assertEquals(expectedResponse.size, response.data().topics().size, "The response size should match")
expectedResponse.foreach { case (topicName, expectedError) =>
@@ -173,19 +183,24 @@ abstract class AbstractCreateTopicsRequestTest extends BaseRequestTest {
}
protected def validateTopicExists(topic: String): Unit = {
- TestUtils.waitForPartitionMetadata(servers, topic, 0)
+ TestUtils.waitForPartitionMetadata(brokers, topic, 0)
val metadata = sendMetadataRequest(
new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala
assertTrue(metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE), "The topic should be created")
}
- protected def sendCreateTopicRequest(request: CreateTopicsRequest,
- socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = {
+ protected def sendCreateTopicRequest(
+ request: CreateTopicsRequest,
+ socketServer: SocketServer = controllerSocketServer
+ ): CreateTopicsResponse = {
connectAndReceive[CreateTopicsResponse](request, socketServer)
}
- protected def sendMetadataRequest(request: MetadataRequest): MetadataResponse = {
- connectAndReceive[MetadataResponse](request)
+ protected def sendMetadataRequest(
+ request: MetadataRequest,
+ socketServer: SocketServer = anySocketServer
+ ): MetadataResponse = {
+ connectAndReceive[MetadataResponse](request, socketServer)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 0f72ee2..94eb213 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -25,14 +25,15 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
-
- @Test
- def testValidCreateTopicsRequests(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testValidCreateTopicsRequests(quorum: String): Unit = {
// Generated assignments
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1"))))
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic2", replicationFactor = 3))))
@@ -60,8 +61,9 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
topicReq("topic14", replicationFactor = -1, numPartitions = 2))))
}
- @Test
- def testErrorCreateTopicsRequests(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testErrorCreateTopicsRequests(quorum: String): Unit = {
val existingTopic = "existing-topic"
createTopic(existingTopic, 1, 1)
// Basic
@@ -98,9 +100,18 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
), checkErrorMessage = false
)
validateTopicExists("partial-none")
+ }
- // Timeout
- // We don't expect a request to ever complete within 1ms. A timeout of 1 ms allows us to test the purgatory timeout logic.
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk"))
+ def testCreateTopicsWithVeryShortTimeouts(quorum: String): Unit = {
+ // When using ZooKeeper, we don't expect a request to ever complete within 1ms.
+ // A timeout of 1 ms allows us to test the purgatory timeout logic.
+ //
+ // Note: we do not test KRaft here because its behavior is different. Server-side
+ // timeouts are much less likely to happen with KRaft since the operation is much
+ // faster. Additionally, if a server side timeout does happen, the operation is
+ // usually not performed.
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("error-timeout", numPartitions = 10, replicationFactor = 3)), timeout = 1),
Map("error-timeout" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
@@ -120,8 +131,10 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
validateTopicExists("error-timeout-negative")
}
- @Test
- def testInvalidCreateTopicsRequests(): Unit = {
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidCreateTopicsRequests(quorum: String): Unit = {
// Partitions/ReplicationFactor and ReplicaAssignment
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("bad-args-topic", numPartitions = 10, replicationFactor = 3,
@@ -134,15 +147,21 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)), checkErrorMessage = false)
}
- @Test
- def testNotController(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk"))
+ def testNotController(quorum: String): Unit = {
+ // Note: we don't run this test when in KRaft mode, because KRaft doesn't have this
+ // behavior of returning NOT_CONTROLLER. Instead, the request is forwarded.
val req = topicsReq(Seq(topicReq("topic1")))
val response = sendCreateTopicRequest(req, notControllerSocketServer)
assertEquals(1, response.errorCounts().get(Errors.NOT_CONTROLLER))
}
- @Test
- def testCreateTopicsRequestVersions(): Unit = {
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk"))
+ def testCreateTopicsRequestVersions(quorum: String): Unit = {
+ // Note: we don't run this test when in KRaft mode, because kraft does not yet support returning topic
+ // configs from CreateTopics.
for (version <- ApiKeys.CREATE_TOPICS.oldestVersion to ApiKeys.CREATE_TOPICS.latestVersion) {
val topic = s"topic_$version"
val data = new CreateTopicsRequestData()