You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/22 23:40:18 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #11790: MINOR: Cleanup admin creation logic in integration tests

jsancio commented on a change in pull request #11790:
URL: https://github.com/apache/kafka/pull/11790#discussion_r812435503



##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -445,33 +453,34 @@ object TestUtils extends Logging {
   }
 
   def createOffsetsTopicWithAdmin[B <: KafkaBroker](
-      brokers: Seq[B],
-      adminConfig: Properties = new Properties) = {
+    admin: Admin,
+    brokers: Seq[B]
+  ): Map[Int, Int] = {
     val broker = brokers.head
-    createTopicWithAdmin(topic = Topic.GROUP_METADATA_TOPIC_NAME,
+    createTopicWithAdmin(
+      admin = admin,
+      topic = Topic.GROUP_METADATA_TOPIC_NAME,
       numPartitions = broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
       replicationFactor = broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       brokers = brokers,
       topicConfig = broker.groupCoordinator.offsetsTopicConfigs,
-      adminConfig = adminConfig)
+    )
   }
 
   def deleteTopicWithAdmin[B <: KafkaBroker](
-      topic: String,
-      brokers: Seq[B],
-      adminConfig: Properties = new Properties): Unit = {
-    val adminClient = createAdminClient(brokers, adminConfig)
+    admin: Admin,
+    topic: String,
+    brokers: Seq[B],
+  ): Unit = {
     try {
-      adminClient.deleteTopics(Collections.singletonList(topic)).all().get()
+      admin.deleteTopics(Collections.singletonList(topic)).all().get()
     } catch {
       case e: ExecutionException => if (e.getCause != null &&
           e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) {
         // ignore
       } else {
         throw e
       }

Review comment:
       I think you should be able to rewrite this to:
   ```suggestion
         case e: ExecutionException if (e.getCause != null &&
             e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) =>
           // ignore
   ```

##########
File path: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
##########
@@ -88,14 +88,14 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
     def createProducer(broker: KafkaBroker): KafkaProducer[Integer, String] =
       TestUtils.createProducer(
-        TestUtils.getBrokerListStrFromServers(Seq(broker)),
+        bootstrapServers(),
         keySerializer = new IntegerSerializer,
         valueSerializer = new StringSerializer
       )
 
     def createConsumer(broker: KafkaBroker): KafkaConsumer[Integer, String] =

Review comment:
       This parameter is not used.

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -369,60 +362,75 @@ object TestUtils extends Logging {
       config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version)
   }
 
-  def createAdminClient[B <: KafkaBroker](
-      brokers: Seq[B],
-      adminConfig: Properties): Admin = {
-    val adminClientProperties = new Properties(adminConfig)
+ def createAdminClient[B <: KafkaBroker](
+    brokers: Seq[B],
+    listenerName: ListenerName,
+    adminConfig: Properties
+  ): Admin = {
+    val adminClientProperties = new Properties()
+    adminClientProperties.putAll(adminConfig)
     if (!adminClientProperties.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) {
-      adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
-        bootstrapServers(brokers, brokers.head.config.interBrokerListenerName))
+      adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(brokers, listenerName))
     }
     Admin.create(adminClientProperties)
   }
 
+  def withAdmin[B <: KafkaBroker, T](
+    brokers: Seq[B],
+    listenerName: ListenerName,
+    adminConfig: Properties = new Properties
+  )(
+    fn: Admin => T
+  ): T = {
+    val admin = createAdminClient(brokers, listenerName, adminConfig)
+    try {
+      fn(admin)
+    } finally {
+      admin.close()
+    }

Review comment:
       How about using `resource` from this type?

##########
File path: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
##########
@@ -88,14 +88,14 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
     def createProducer(broker: KafkaBroker): KafkaProducer[Integer, String] =

Review comment:
       This parameter is not used.

##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -58,7 +58,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     _brokers.asInstanceOf[mutable.Buffer[KafkaServer]]
   }
 
-  var brokerList: String = null

Review comment:
       :+1: 

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -220,17 +220,10 @@ object TestUtils extends Logging {
     }
   }
 
-  def getBrokerListStrFromServers[B <: KafkaBroker](
-      brokers: Seq[B],
-      protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): String = {
-    brokers.map { s =>
-      val listener = s.config.effectiveAdvertisedListeners.find(_.securityProtocol == protocol).getOrElse(
-        sys.error(s"Could not find listener with security protocol $protocol"))
-      formatAddress(listener.host, boundPort(s, protocol))
-    }.mkString(",")
-  }
-
-  def bootstrapServers[B <: KafkaBroker](brokers: Seq[B], listenerName: ListenerName): String = {
+  def bootstrapServers[B <: KafkaBroker](
+    brokers: Seq[B],
+    listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)

Review comment:
       What do you think about removing the default value for `listenerName` and creating another method named `plaintextBootstrapServers` that just take a list of `KafkaBroker`.

##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -141,25 +144,51 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     createBrokers(startup)
   }
 
+  def createOffsetsTopic(
+    listenerName: ListenerName = listenerName,
+    adminClientConfig: Properties = new Properties
+  ): Unit = {
+    if (isKRaftTest()) {
+      TestUtils.withAdmin(brokers, listenerName, adminClientConfig) { admin =>

Review comment:
       How about removing `withAdmin` and manually composing this with `resource` and `createAdminClient` in `TestUtils`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org