You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/18 01:00:06 UTC

[GitHub] [kafka] hachikuji opened a new pull request #11790: MINOR: Cleanup admin creation logic in integration tests

hachikuji opened a new pull request #11790:
URL: https://github.com/apache/kafka/pull/11790


   There seemed to be a little sloppiness in the integration tests in regard to admin client creation. Not only was there duplicated logic, but it wasn't always clear which listener the admin client was targeting. This made it difficult to tell in the context of authorization tests whether we were indeed testing with the right principal. As an example, we had a method in `TestUtils` which was using the inter-broker listener implicitly. This meant that the test was using the broker principal which had super user privilege implicitly. This was intentional, but I think it would be clearer to make the dependence on this listener explicit. This patch attempts to clean this up a bit by consolidating some of the admin creation logic and making the reliance on the listener clearer.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11790: MINOR: Cleanup admin creation logic in integration tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11790:
URL: https://github.com/apache/kafka/pull/11790#discussion_r812435503



##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -445,33 +453,34 @@ object TestUtils extends Logging {
   }
 
   def createOffsetsTopicWithAdmin[B <: KafkaBroker](
-      brokers: Seq[B],
-      adminConfig: Properties = new Properties) = {
+    admin: Admin,
+    brokers: Seq[B]
+  ): Map[Int, Int] = {
     val broker = brokers.head
-    createTopicWithAdmin(topic = Topic.GROUP_METADATA_TOPIC_NAME,
+    createTopicWithAdmin(
+      admin = admin,
+      topic = Topic.GROUP_METADATA_TOPIC_NAME,
       numPartitions = broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
       replicationFactor = broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       brokers = brokers,
       topicConfig = broker.groupCoordinator.offsetsTopicConfigs,
-      adminConfig = adminConfig)
+    )
   }
 
   def deleteTopicWithAdmin[B <: KafkaBroker](
-      topic: String,
-      brokers: Seq[B],
-      adminConfig: Properties = new Properties): Unit = {
-    val adminClient = createAdminClient(brokers, adminConfig)
+    admin: Admin,
+    topic: String,
+    brokers: Seq[B],
+  ): Unit = {
     try {
-      adminClient.deleteTopics(Collections.singletonList(topic)).all().get()
+      admin.deleteTopics(Collections.singletonList(topic)).all().get()
     } catch {
       case e: ExecutionException => if (e.getCause != null &&
           e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) {
         // ignore
       } else {
         throw e
       }

Review comment:
       I think you should be able to rewrite this to:
   ```suggestion
         case e: ExecutionException if (e.getCause != null &&
             e.getCause.isInstanceOf[UnknownTopicOrPartitionException]) =>
           // ignore
   ```

##########
File path: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
##########
@@ -88,14 +88,14 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
     def createProducer(broker: KafkaBroker): KafkaProducer[Integer, String] =
       TestUtils.createProducer(
-        TestUtils.getBrokerListStrFromServers(Seq(broker)),
+        bootstrapServers(),
         keySerializer = new IntegerSerializer,
         valueSerializer = new StringSerializer
       )
 
     def createConsumer(broker: KafkaBroker): KafkaConsumer[Integer, String] =

Review comment:
       This parameter is not used.

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -369,60 +362,75 @@ object TestUtils extends Logging {
       config.setProperty(KafkaConfig.LogMessageFormatVersionProp, version.version)
   }
 
-  def createAdminClient[B <: KafkaBroker](
-      brokers: Seq[B],
-      adminConfig: Properties): Admin = {
-    val adminClientProperties = new Properties(adminConfig)
+ def createAdminClient[B <: KafkaBroker](
+    brokers: Seq[B],
+    listenerName: ListenerName,
+    adminConfig: Properties
+  ): Admin = {
+    val adminClientProperties = new Properties()
+    adminClientProperties.putAll(adminConfig)
     if (!adminClientProperties.containsKey(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)) {
-      adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
-        bootstrapServers(brokers, brokers.head.config.interBrokerListenerName))
+      adminClientProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(brokers, listenerName))
     }
     Admin.create(adminClientProperties)
   }
 
+  def withAdmin[B <: KafkaBroker, T](
+    brokers: Seq[B],
+    listenerName: ListenerName,
+    adminConfig: Properties = new Properties
+  )(
+    fn: Admin => T
+  ): T = {
+    val admin = createAdminClient(brokers, listenerName, adminConfig)
+    try {
+      fn(admin)
+    } finally {
+      admin.close()
+    }

Review comment:
       How about using `resource` from this type?

##########
File path: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
##########
@@ -88,14 +88,14 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
     def createProducer(broker: KafkaBroker): KafkaProducer[Integer, String] =

Review comment:
       This parameter is not used.

##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -58,7 +58,6 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     _brokers.asInstanceOf[mutable.Buffer[KafkaServer]]
   }
 
-  var brokerList: String = null

Review comment:
       :+1: 

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -220,17 +220,10 @@ object TestUtils extends Logging {
     }
   }
 
-  def getBrokerListStrFromServers[B <: KafkaBroker](
-      brokers: Seq[B],
-      protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): String = {
-    brokers.map { s =>
-      val listener = s.config.effectiveAdvertisedListeners.find(_.securityProtocol == protocol).getOrElse(
-        sys.error(s"Could not find listener with security protocol $protocol"))
-      formatAddress(listener.host, boundPort(s, protocol))
-    }.mkString(",")
-  }
-
-  def bootstrapServers[B <: KafkaBroker](brokers: Seq[B], listenerName: ListenerName): String = {
+  def bootstrapServers[B <: KafkaBroker](
+    brokers: Seq[B],
+    listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)

Review comment:
       What do you think about removing the default value for `listenerName` and creating another method named `plaintextBootstrapServers` that just take a list of `KafkaBroker`.

##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -141,25 +144,51 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
     createBrokers(startup)
   }
 
+  def createOffsetsTopic(
+    listenerName: ListenerName = listenerName,
+    adminClientConfig: Properties = new Properties
+  ): Unit = {
+    if (isKRaftTest()) {
+      TestUtils.withAdmin(brokers, listenerName, adminClientConfig) { admin =>

Review comment:
       How about removing `withAdmin` and manually composing this with `resource` and `createAdminClient` in `TestUtils`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11790: MINOR: Cleanup admin creation logic in integration tests

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11790:
URL: https://github.com/apache/kafka/pull/11790#discussion_r813357222



##########
File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -168,18 +198,41 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
    * Wait until the leader is elected and the metadata is propagated to all brokers.
    * Return the leader for each partition.
    */
-  def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
+  def createTopicWithAssignment(
+    topic: String,
+    partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
+    listenerName: ListenerName = listenerName
+  ): scala.collection.immutable.Map[Int, Int] =
     if (isKRaftTest()) {
-      TestUtils.createTopicWithAdmin(topic = topic,
-        replicaAssignment = partitionReplicaAssignment,
-        brokers = brokers)
+      resource(createAdminClient(brokers, listenerName)) { admin =>
+        TestUtils.createTopicWithAdmin(
+          admin = admin,
+          topic = topic,
+          replicaAssignment = partitionReplicaAssignment,
+          brokers = brokers
+        )
+      }
     } else {
-      TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, servers)
+      TestUtils.createTopic(
+        zkClient,
+        topic,
+        partitionReplicaAssignment,
+        servers
+      )
     }
 
-  def deleteTopic(topic: String): Unit = {
+  def deleteTopic(
+    topic: String,
+    listenerName: ListenerName = listenerName
+  ): Unit = {
     if (isKRaftTest()) {
-      TestUtils.deleteTopicWithAdmin(topic, brokers)
+

Review comment:
       Extra newline.

##########
File path: core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
##########
@@ -58,7 +59,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), securityProtocol = SecurityProtocol.PLAINTEXT)
+    consumer = TestUtils.createConsumer(bootstrapServers(
+      listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
+      securityProtocol = SecurityProtocol.PLAINTEXT
+    )

Review comment:
       I think the formatting is incorrect here. Should it be?
   
   ```suggestion
       consumer = TestUtils.createConsumer(
         bootstrapServers(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
         securityProtocol = SecurityProtocol.PLAINTEXT
       )
   ```

##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -204,7 +204,7 @@ class BrokerMetadataListener(
       batch.records().forEach { messageAndVersion =>
         if (isTraceEnabled) {
           trace("Metadata batch %d: processing [%d/%d]: %s.".format(batch.lastOffset, index + 1,
-            batch.records().size(), messageAndVersion.message().toString()))
+            batch.records().size(), messageAndVersion.message()))

Review comment:
       I didn't notice this earlier but maybe we can just first this to use Scala's `s""` interpolation instead of `format`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11790: MINOR: Cleanup admin creation logic in integration tests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11790:
URL: https://github.com/apache/kafka/pull/11790#issuecomment-1048345473


   @jsancio Thanks for the comments. I've pushed an update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11790: MINOR: Cleanup admin creation logic in integration tests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11790:
URL: https://github.com/apache/kafka/pull/11790#discussion_r812466771



##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -220,17 +220,10 @@ object TestUtils extends Logging {
     }
   }
 
-  def getBrokerListStrFromServers[B <: KafkaBroker](
-      brokers: Seq[B],
-      protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): String = {
-    brokers.map { s =>
-      val listener = s.config.effectiveAdvertisedListeners.find(_.securityProtocol == protocol).getOrElse(
-        sys.error(s"Could not find listener with security protocol $protocol"))
-      formatAddress(listener.host, boundPort(s, protocol))
-    }.mkString(",")
-  }
-
-  def bootstrapServers[B <: KafkaBroker](brokers: Seq[B], listenerName: ListenerName): String = {
+  def bootstrapServers[B <: KafkaBroker](
+    brokers: Seq[B],
+    listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)

Review comment:
       Yeah, that's a good suggestion. That makes the dependence on the listener clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #11790: MINOR: Cleanup admin creation logic in integration tests

Posted by GitBox <gi...@apache.org>.
jsancio merged pull request #11790:
URL: https://github.com/apache/kafka/pull/11790


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org