You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/19 17:06:59 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11410: MINOR: Make TestUtils usable for KRaft mode

hachikuji commented on a change in pull request #11410:
URL: https://github.com/apache/kafka/pull/11410#discussion_r732060868



##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -346,6 +367,97 @@ object TestUtils extends Logging {
       config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version)
   }
 
+  def createAdminClient[B <: KafkaBroker](
+      brokers: Seq[B],
+      adminConfig: Properties): Admin = {
+    val adminClientProperties = if (adminConfig.isEmpty) {
+      val newConfig = new Properties()
+      newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers))
+      newConfig
+    } else {
+      adminConfig
+    }
+    Admin.create(adminClientProperties)
+  }
+
+  def createTopicWithAdmin[B <: KafkaBroker](

Review comment:
       There seems to be some duplication with `IntegrationTestUtils.createTopic`. 

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -346,6 +367,97 @@ object TestUtils extends Logging {
       config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version)
   }
 
+  def createAdminClient[B <: KafkaBroker](
+      brokers: Seq[B],
+      adminConfig: Properties): Admin = {
+    val adminClientProperties = if (adminConfig.isEmpty) {

Review comment:
       Perhaps instead of checking if the config is empty, we can check if the bootstrap servers property is defined?

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1640,6 +1800,37 @@ object TestUtils extends Logging {
     }, s"Timed out waiting for brokerId $brokerId to come online")
   }
 
+  def getReplicaAssignmentForTopics[B <: KafkaBroker](
+      topicNames: Seq[String],
+      brokers: Seq[B],
+      adminConfig: Properties = new Properties): Map[TopicPartition, Seq[Int]] = {
+    val adminClient = createAdminClient(brokers, adminConfig)
+    val results = new mutable.HashMap[TopicPartition, Seq[Int]]
+    try {
+      adminClient.describeTopics(topicNames.toList.asJava).topicNameValues().forEach {
+        case (topicName, future) =>
+          try {
+            val description = future.get()
+            description.partitions().forEach {
+              case partition =>
+                val topicPartition = new TopicPartition(topicName, partition.partition())
+                results.put(topicPartition, partition.replicas().asScala.map(_.id))
+            }
+          } catch {
+            case e: ExecutionException => if (e.getCause != null &&
+              e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) {
+              // ignore

Review comment:
       Wondering if it would be better to let this propagate instead of returning an empty result. At least maybe we can mention the behavior in a doc comment.

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -1220,37 +1348,42 @@ object TestUtils extends Logging {
     }
   }
 
-
-  def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]): Unit = {
+  def verifyTopicDeletion[B <: KafkaBroker](
+      zkClient: KafkaZkClient,
+      topic: String,
+      numPartitions: Int,
+      brokers: Seq[B]): Unit = {
     val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
-    // wait until admin path for delete topic is deleted, signaling completion of topic deletion
-    waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
-      "Admin path /admin/delete_topics/%s path not deleted even after a replica is restarted".format(topic))
-    waitUntilTrue(() => !zkClient.topicExists(topic),
-      "Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic))
+    if (zkClient != null) {

Review comment:
       I wonder if we need this logic. It seems not necessary for kraft clusters, so why do we need it for zk clusters?

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -346,6 +367,97 @@ object TestUtils extends Logging {
       config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version)
   }
 
+  def createAdminClient[B <: KafkaBroker](
+      brokers: Seq[B],
+      adminConfig: Properties): Admin = {
+    val adminClientProperties = if (adminConfig.isEmpty) {
+      val newConfig = new Properties()
+      newConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerListStrFromServers(brokers))
+      newConfig
+    } else {
+      adminConfig
+    }
+    Admin.create(adminClientProperties)
+  }
+
+  def createTopicWithAdmin[B <: KafkaBroker](
+      topic: String,
+      numPartitions: Int = 1,
+      replicationFactor: Int = 1,
+      brokers: Seq[B],
+      topicConfig: Properties = new Properties,
+      adminConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
+    val adminClient = createAdminClient(brokers, adminConfig)
+    try {
+      val configsMap = new java.util.HashMap[String, String]()
+      topicConfig.forEach((k, v) => configsMap.put(k.toString, v.toString))
+      try {
+        adminClient.createTopics(Collections.singletonList(new NewTopic(
+          topic, numPartitions, replicationFactor.toShort).configs(configsMap))).all().get()
+      } catch {
+        case e: ExecutionException => if (e.getCause != null &&
+          e.getCause.isInstanceOf[TopicExistsException] &&
+          topicHasSameNumPartitionsAndReplicationFactor(adminClient, topic, numPartitions, replicationFactor)) {
+        } else {

Review comment:
       nit: this empty `if` looks a little weird. Couldn't we invert the check?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org