You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/06 04:08:05 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection

showuon commented on code in PR #12381:
URL: https://github.com/apache/kafka/pull/12381#discussion_r914394460


##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -429,6 +430,20 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       verifyProduceConsume(producer, consumer, 10, topic)
     }
 
+    def verifyBrokerToControllerCall(controller: KafkaServer): Unit = {
+      val nonControllerBroker = servers.find(_.config.brokerId != controller.config.brokerId).get
+      val brokerToControllerManager = nonControllerBroker.clientToControllerChannelManager
+      val completionHandler = new TestControllerRequestCompletionHandler()
+      brokerToControllerManager.sendRequest(new MetadataRequest.Builder(new MetadataRequestData()), completionHandler)
+      TestUtils.waitUntilTrue(() => {
+        completionHandler.completed.get() || completionHandler.timedOut.get()
+      }, "Timed out while waiting for broker to controller API call")
+      val response = completionHandler.actualResponse.getOrElse(throw new IllegalStateException("No response recorded even though request is completed"))

Review Comment:
   throwing exception in the test seems not a good practice. Maybe we can change to this:
   ```
   assertTrue(completionHandler.actualResponse.isDefined, "No response recorded even though request is completed")
   val response = completionHandler.actualResponse.get
   ```



##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -429,6 +430,20 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       verifyProduceConsume(producer, consumer, 10, topic)
     }
 
+    def verifyBrokerToControllerCall(controller: KafkaServer): Unit = {
+      val nonControllerBroker = servers.find(_.config.brokerId != controller.config.brokerId).get
+      val brokerToControllerManager = nonControllerBroker.clientToControllerChannelManager
+      val completionHandler = new TestControllerRequestCompletionHandler()
+      brokerToControllerManager.sendRequest(new MetadataRequest.Builder(new MetadataRequestData()), completionHandler)
+      TestUtils.waitUntilTrue(() => {
+        completionHandler.completed.get() || completionHandler.timedOut.get()
+      }, "Timed out while waiting for broker to controller API call")
+      val response = completionHandler.actualResponse.getOrElse(throw new IllegalStateException("No response recorded even though request is completed"))

Review Comment:
   Also, I think we should fail when timeout, so that we know what happened in the request. Ex:
   ```
   assertTrue(completionHandler.timedOut.isEmpty, "broker to controller request is timeout")
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org