You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/09 08:19:10 UTC

[GitHub] [kafka] dengziming opened a new pull request, #12614: MINOR: Support KRaft mode in RequestQuotaTest

dengziming opened a new pull request, #12614:
URL: https://github.com/apache/kafka/pull/12614

   *More detailed description of your change*
   Support kraft mode in RequestQuotaTest
   
   *Summary of testing strategy (including rationale)*
   QA
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on pull request #12614: MINOR: Support KRaft mode in RequestQuotaTest

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on PR #12614:
URL: https://github.com/apache/kafka/pull/12614#issuecomment-1676972359

   Fixed in #14201


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming closed pull request #12614: MINOR: Support KRaft mode in RequestQuotaTest

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming closed pull request #12614: MINOR: Support KRaft mode in RequestQuotaTest
URL: https://github.com/apache/kafka/pull/12614


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12614: MINOR: Support KRaft mode in RequestQuotaTest

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12614:
URL: https://github.com/apache/kafka/pull/12614#discussion_r966769259


##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -90,93 +104,142 @@ class RequestQuotaTest extends BaseRequestTest {
     super.setUp(testInfo)
 
     createTopic(topic, numPartitions)
-    leaderNode = servers.head
+    leaderNode = brokers.head
 
     // Change default client-id request quota to a small value and a single unthrottledClient with a large quota
     val quotaProps = new Properties()
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
     quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
     quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
-    adminZkClient.changeClientIdConfig("<default>", quotaProps)
+    changeClientIdConfig("<default>", quotaProps)
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "2000")
-    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
 
     // Client ids with small producer and consumer (fetch) quotas. Quota values were picked so that both
     // producer/consumer and request quotas are violated on the first produce/consume operation, and the delay due to
     // producer/consumer quota violation will be longer than the delay due to request quota violation.
     quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "1")
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
-    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), quotaProps)
     quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "1")
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
-    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
 
     TestUtils.retry(20000) {
-      val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request
+      val quotaManager = brokers.head.dataPlaneRequestProcessor.quotas.request
       assertEquals(Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"), s"Default request quota not set")
       assertEquals(Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId), s"Request quota override not set")
-      val produceQuotaManager = servers.head.dataPlaneRequestProcessor.quotas.produce
+      val produceQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.produce
       assertEquals(Quota.upperBound(1), produceQuotaManager.quota("some-user", smallQuotaProducerClientId), s"Produce quota override not set")
-      val consumeQuotaManager = servers.head.dataPlaneRequestProcessor.quotas.fetch
+      val consumeQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.fetch
       assertEquals(Quota.upperBound(1), consumeQuotaManager.quota("some-user", smallQuotaConsumerClientId), s"Consume quota override not set")
     }
   }
 
+  private def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = {
+    if (isKRaftTest()) {
+      val admin = createAdminClient()
+      admin.alterClientQuotas(Collections.singleton(
+        new ClientQuotaAlteration(
+          new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
+          configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble)}.toList.asJava)
+      )).all().get()
+    } else {
+      adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
+    }
+  }
+
   @AfterEach
   override def tearDown(): Unit = {
     try executor.shutdownNow()
     finally super.tearDown()
   }
 
-  @Test
-  def testResponseThrottleTime(): Unit = {
-    for (apiKey <- RequestQuotaTest.ClientActions ++ RequestQuotaTest.ClusterActionsWithThrottle)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResponseThrottleTime(quorum: String): Unit = {
+    val apiKeys = if (isKRaftTest()) {
+      (clientActions ++ clusterActionsWithThrottle).filterNot(_.forwardable)
+    } else {
+      clientActions ++ clusterActionsWithThrottle
+    }
+    for (apiKey <- apiKeys)
       submitTest(apiKey, () => checkRequestThrottleTime(apiKey))
 
     waitAndCheckResults()
   }
 
-  @Test
-  def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(quorum: String): Unit = {
     submitTest(ApiKeys.PRODUCE, () => checkSmallQuotaProducerRequestThrottleTime())
     waitAndCheckResults()
   }
 
-  @Test
-  def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(quorum: String): Unit = {
     submitTest(ApiKeys.FETCH, () => checkSmallQuotaConsumerRequestThrottleTime())
     waitAndCheckResults()
   }
 
-  @Test
-  def testUnthrottledClient(): Unit = {
-    for (apiKey <- RequestQuotaTest.ClientActions) {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUnthrottledClient(quorum: String): Unit = {
+    for (apiKey <- clientActions) {
       submitTest(apiKey, () => checkUnthrottledClient(apiKey))
     }
 
     waitAndCheckResults()
   }
 
-  @Test
-  def testExemptRequestTime(): Unit = {
-    for (apiKey <- RequestQuotaTest.ClusterActions -- RequestQuotaTest.ClusterActionsWithThrottle) {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testExemptRequestTime(quorum: String): Unit = {
+    for (apiKey <- clusterActions -- clusterActionsWithThrottle) {
       submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
     }
 
     waitAndCheckResults()
   }
 
-  @Test
-  def testUnauthorizedThrottle(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUnauthorizedThrottle(quorum: String): Unit = {
     RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
 
-    for (apiKey <- ApiKeys.zkBrokerApis.asScala) {
+    val apiKeys = if (isKRaftTest()) ApiKeys.kraftBrokerApis().asScala.filterNot(_.forwardable) else ApiKeys.zkBrokerApis.asScala

Review Comment:
   ditto



##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -90,93 +104,142 @@ class RequestQuotaTest extends BaseRequestTest {
     super.setUp(testInfo)
 
     createTopic(topic, numPartitions)
-    leaderNode = servers.head
+    leaderNode = brokers.head
 
     // Change default client-id request quota to a small value and a single unthrottledClient with a large quota
     val quotaProps = new Properties()
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
     quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
     quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "2000")
-    adminZkClient.changeClientIdConfig("<default>", quotaProps)
+    changeClientIdConfig("<default>", quotaProps)
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "2000")
-    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
 
     // Client ids with small producer and consumer (fetch) quotas. Quota values were picked so that both
     // producer/consumer and request quotas are violated on the first produce/consume operation, and the delay due to
     // producer/consumer quota violation will be longer than the delay due to request quota violation.
     quotaProps.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "1")
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
-    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(smallQuotaProducerClientId), quotaProps)
     quotaProps.put(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "1")
     quotaProps.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "0.01")
-    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
+    changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
 
     TestUtils.retry(20000) {
-      val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request
+      val quotaManager = brokers.head.dataPlaneRequestProcessor.quotas.request
       assertEquals(Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"), s"Default request quota not set")
       assertEquals(Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId), s"Request quota override not set")
-      val produceQuotaManager = servers.head.dataPlaneRequestProcessor.quotas.produce
+      val produceQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.produce
       assertEquals(Quota.upperBound(1), produceQuotaManager.quota("some-user", smallQuotaProducerClientId), s"Produce quota override not set")
-      val consumeQuotaManager = servers.head.dataPlaneRequestProcessor.quotas.fetch
+      val consumeQuotaManager = brokers.head.dataPlaneRequestProcessor.quotas.fetch
       assertEquals(Quota.upperBound(1), consumeQuotaManager.quota("some-user", smallQuotaConsumerClientId), s"Consume quota override not set")
     }
   }
 
+  private def changeClientIdConfig(sanitizedClientId: String, configs: Properties): Unit = {
+    if (isKRaftTest()) {
+      val admin = createAdminClient()
+      admin.alterClientQuotas(Collections.singleton(
+        new ClientQuotaAlteration(
+          new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if (sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
+          configs.asScala.map { case (key, value) => new ClientQuotaAlteration.Op(key, value.toDouble)}.toList.asJava)
+      )).all().get()
+    } else {
+      adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
+    }
+  }
+
   @AfterEach
   override def tearDown(): Unit = {
     try executor.shutdownNow()
     finally super.tearDown()
   }
 
-  @Test
-  def testResponseThrottleTime(): Unit = {
-    for (apiKey <- RequestQuotaTest.ClientActions ++ RequestQuotaTest.ClusterActionsWithThrottle)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResponseThrottleTime(quorum: String): Unit = {
+    val apiKeys = if (isKRaftTest()) {
+      (clientActions ++ clusterActionsWithThrottle).filterNot(_.forwardable)

Review Comment:
   Currently, `ClientQuotasDelta` is only published to brokers whereas `ControllerApis.QuotaManagers` is not updated after we update client quota configs, so all request is not throttled in the controller, so we need to filter forwardable ApiKeys here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org