You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/08/03 09:45:53 UTC
[kafka] branch trunk updated: KAFKA-10163;
Define `controller_mutation_rate` as a Double instead of a Long
(#9092)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5c2991a KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long (#9092)
5c2991a is described below
commit 5c2991aff66e58c4ab6a04bef85e1cac4ef8f5f6
Author: David Jacot <dj...@confluent.io>
AuthorDate: Mon Aug 3 11:44:42 2020 +0200
KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long (#9092)
Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
core/src/main/scala/kafka/server/ConfigHandler.scala | 2 +-
.../main/scala/kafka/server/ControllerMutationQuotaManager.scala | 4 ++++
core/src/main/scala/kafka/server/DynamicConfig.scala | 6 +++---
.../test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala | 5 +++--
4 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 97111be..5f67177 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -146,7 +146,7 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
quotaManagers.request.updateQuota(sanitizedUser, clientId, sanitizedClientId, requestQuota)
val controllerMutationQuota =
if (config.containsKey(DynamicConfig.Client.ControllerMutationOverrideProp))
- Some(new Quota(config.getProperty(DynamicConfig.Client.ControllerMutationOverrideProp).toLong.toDouble, true))
+ Some(new Quota(config.getProperty(DynamicConfig.Client.ControllerMutationOverrideProp).toDouble, true))
else
None
quotaManagers.controllerMutation.updateQuota(sanitizedUser, clientId, sanitizedClientId, controllerMutationQuota)
diff --git a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
index 8639a04..bd055a8 100644
--- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
@@ -129,6 +129,10 @@ class PermissiveControllerMutationQuota(private val time: Time,
}
}
+object ControllerMutationQuotaManager {
+ val QuotaControllerMutationDefault = Int.MaxValue.toDouble
+}
+
/**
* The ControllerMutationQuotaManager is a specialized ClientQuotaManager used in the context
* of throttling controller's operations/mutations.
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 50a5b62..0a64382 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -89,7 +89,7 @@ object DynamicConfig {
val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaDefault
val DefaultConsumerOverride = ClientQuotaManagerConfig.QuotaDefault
val DefaultRequestOverride = ClientRequestQuotaManager.QuotaRequestPercentDefault
- val DefaultControllerMutationOverride = ClientQuotaManagerConfig.QuotaDefault
+ val DefaultControllerMutationOverride = ControllerMutationQuotaManager.QuotaControllerMutationDefault
// Documentation
val ProducerOverrideDoc = "A rate representing the upper bound (bytes/sec) for producer traffic."
@@ -103,7 +103,7 @@ object DynamicConfig {
.define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
.define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
.define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
- .define(ControllerMutationOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ControllerMutationOverrideDoc)
+ .define(ControllerMutationOverrideProp, DOUBLE, DefaultControllerMutationOverride, MEDIUM, ControllerMutationOverrideDoc)
def configKeys = clientConfigs.configKeys
@@ -118,7 +118,7 @@ object DynamicConfig {
.define(Client.ProducerByteRateOverrideProp, LONG, Client.DefaultProducerOverride, MEDIUM, Client.ProducerOverrideDoc)
.define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
.define(Client.RequestPercentageOverrideProp, DOUBLE, Client.DefaultRequestOverride, MEDIUM, Client.RequestOverrideDoc)
- .define(Client.ControllerMutationOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ControllerMutationOverrideDoc)
+ .define(Client.ControllerMutationOverrideProp, DOUBLE, Client.DefaultControllerMutationOverride, MEDIUM, Client.ControllerMutationOverrideDoc)
def configKeys = userConfigs.configKeys
diff --git a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index c435fb9..1a1170e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -110,13 +110,14 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
@Test
def testSetUnsetQuota(): Unit = {
+ val rate = 1.5
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "User")
// Default Value
waitUserQuota(principal.getName, Long.MaxValue)
// Define a new quota
- defineUserQuota(principal.getName, Some(ControllerMutationRate))
+ defineUserQuota(principal.getName, Some(rate))
// Check it
- waitUserQuota(principal.getName, ControllerMutationRate)
+ waitUserQuota(principal.getName, rate)
// Remove it
defineUserQuota(principal.getName, None)
// Back to the default