You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/08/03 09:45:53 UTC

[kafka] branch trunk updated: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long (#9092)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5c2991a  KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long (#9092)
5c2991a is described below

commit 5c2991aff66e58c4ab6a04bef85e1cac4ef8f5f6
Author: David Jacot <dj...@confluent.io>
AuthorDate: Mon Aug 3 11:44:42 2020 +0200

    KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long (#9092)
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
 core/src/main/scala/kafka/server/ConfigHandler.scala                | 2 +-
 .../main/scala/kafka/server/ControllerMutationQuotaManager.scala    | 4 ++++
 core/src/main/scala/kafka/server/DynamicConfig.scala                | 6 +++---
 .../test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala  | 5 +++--
 4 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 97111be..5f67177 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -146,7 +146,7 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
     quotaManagers.request.updateQuota(sanitizedUser, clientId, sanitizedClientId, requestQuota)
     val controllerMutationQuota =
       if (config.containsKey(DynamicConfig.Client.ControllerMutationOverrideProp))
-        Some(new Quota(config.getProperty(DynamicConfig.Client.ControllerMutationOverrideProp).toLong.toDouble, true))
+        Some(new Quota(config.getProperty(DynamicConfig.Client.ControllerMutationOverrideProp).toDouble, true))
       else
         None
     quotaManagers.controllerMutation.updateQuota(sanitizedUser, clientId, sanitizedClientId, controllerMutationQuota)
diff --git a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index 8639a04..bd055a8 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -129,6 +129,10 @@ class PermissiveControllerMutationQuota(private val time: Time,
   }
 }
 
+object ControllerMutationQuotaManager {
+  val QuotaControllerMutationDefault = Int.MaxValue.toDouble
+}
+
 /**
  * The ControllerMutationQuotaManager is a specialized ClientQuotaManager used in the context
  * of throttling controller's operations/mutations.
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 50a5b62..0a64382 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -89,7 +89,7 @@ object DynamicConfig {
     val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaDefault
     val DefaultConsumerOverride = ClientQuotaManagerConfig.QuotaDefault
     val DefaultRequestOverride = ClientRequestQuotaManager.QuotaRequestPercentDefault
-    val DefaultControllerMutationOverride = ClientQuotaManagerConfig.QuotaDefault
+    val DefaultControllerMutationOverride = ControllerMutationQuotaManager.QuotaControllerMutationDefault
 
     // Documentation
     val ProducerOverrideDoc = "A rate representing the upper bound (bytes/sec) for producer traffic."
@@ -103,7 +103,7 @@ object DynamicConfig {
       .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
       .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
       .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
-      .define(ControllerMutationOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ControllerMutationOverrideDoc)
+      .define(ControllerMutationOverrideProp, DOUBLE, DefaultControllerMutationOverride, MEDIUM, ControllerMutationOverrideDoc)
 
     def configKeys = clientConfigs.configKeys
 
@@ -118,7 +118,7 @@ object DynamicConfig {
       .define(Client.ProducerByteRateOverrideProp, LONG, Client.DefaultProducerOverride, MEDIUM, Client.ProducerOverrideDoc)
       .define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
       .define(Client.RequestPercentageOverrideProp, DOUBLE, Client.DefaultRequestOverride, MEDIUM, Client.RequestOverrideDoc)
-      .define(Client.ControllerMutationOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ControllerMutationOverrideDoc)
+      .define(Client.ControllerMutationOverrideProp, DOUBLE, Client.DefaultControllerMutationOverride, MEDIUM, Client.ControllerMutationOverrideDoc)
 
     def configKeys = userConfigs.configKeys
 
diff --git a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index c435fb9..1a1170e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -110,13 +110,14 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
 
   @Test
   def testSetUnsetQuota(): Unit = {
+    val rate = 1.5
     val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "User")
     // Default Value
     waitUserQuota(principal.getName, Long.MaxValue)
     // Define a new quota
-    defineUserQuota(principal.getName, Some(ControllerMutationRate))
+    defineUserQuota(principal.getName, Some(rate))
     // Check it
-    waitUserQuota(principal.getName, ControllerMutationRate)
+    waitUserQuota(principal.getName, rate)
     // Remove it
     defineUserQuota(principal.getName, None)
     // Back to the default