You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/10 23:07:48 UTC
[kafka] branch 2.3 updated: KAFKA-9658;
Fix user quota removal (#8232)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 2b26c23 KAFKA-9658; Fix user quota removal (#8232)
2b26c23 is described below
commit 2b26c23d36ef64a2a40044c2b7e50056ec924db8
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Tue Mar 10 12:30:58 2020 -0700
KAFKA-9658; Fix user quota removal (#8232)
Adding (add-config) default user, user, or <user, client-id> quota and then removing it via delete-config does not update quota bound in ClientQuotaManager.Metrics for existing users or <user,client-id>. This causes brokers to continue to throttle with the previously set quotas until brokers restart (or <user,client> stops sending traffic for sometime and sensor expires). This happens only when removing the user or user,client-id where there are no more quotas to fall back to. Common [...]
The cause of the issue was `DefaultQuotaCallback.quotaLimit` was returning `null` when no default user quota set, which caused `ClientQuotaManager.updateQuotaMetricConfigs` to skip updating the appropriate sensor, which left it unchanged with the previous quota. Since `null` is an acceptable return value for `ClientQuotaCallback.quotaLimit`, which is already treated as unlimited quota in other parts of the code, this PR ensures that `ClientQuotaManager.updateQuotaMetricConfigs` update [...]
Reviewers: Jason Gustafson <ja...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
.../scala/kafka/server/ClientQuotaManager.scala | 5 +-
.../unit/kafka/server/ClientQuotaManagerTest.scala | 132 +++++++++++++++------
2 files changed, 98 insertions(+), 39 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 9817f21..87e4923 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -483,7 +483,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
// Change the underlying metric config if the sensor has been created
val metric = allMetrics.get(quotaMetricName)
if (metric != null) {
- Option(quotaCallback.quotaLimit(clientQuotaType, metricTags.asJava)).foreach { newQuota =>
+ Option(quotaLimit(metricTags.asJava)).foreach { newQuota =>
info(s"Sensor for $quotaEntity already exists. Changing quota to $newQuota in MetricConfig")
metric.config(getQuotaMetricConfig(newQuota))
}
@@ -493,8 +493,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
case (metricName, metric) =>
val metricTags = metricName.tags
- Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).foreach { quota =>
- val newQuota = quota.asInstanceOf[Double]
+ Option(quotaLimit(metricTags)).foreach { newQuota =>
if (newQuota != metric.config.quota.bound) {
info(s"Sensor for quota-id $metricTags already exists. Setting quota to $newQuota in MetricConfig")
metric.config(getQuotaMetricConfig(newQuota))
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index e10d4b2..6598d3e 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -190,19 +190,79 @@ class ClientQuotaManagerTest {
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}
+ private def checkQuota(quotaManager: ClientQuotaManager, user: String, clientId: String, expectedBound: Long, value: Int, expectThrottle: Boolean): Unit = {
+ assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
+ val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * config.numQuotaSamples)
+ if (expectThrottle)
+ assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
+ else
+ assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
+ }
+
@Test
- def testQuotaConfigPrecedence() {
- val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
- newMetrics, Produce, time, "")
-
- def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
- assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
- val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * config.numQuotaSamples)
- if (expectThrottle)
- assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
- else
- assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
+ def testSetAndRemoveDefaultUserQuota(): Unit = {
+ // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
+ val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = Long.MaxValue),
+ newMetrics, Produce, time, "")
+
+ try {
+ // no quota set yet, should not throttle
+ checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+
+ // Set default <user> quota config
+ quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, Some(new Quota(10, true)))
+ checkQuota(quotaManager, "userA", "client1", 10, 1000, true)
+
+ // Remove default <user> quota config, back to no quotas
+ quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, None)
+ checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+ } finally {
+ quotaManager.shutdown()
}
+ }
+
+ @Test
+ def testSetAndRemoveUserQuota(): Unit = {
+ // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
+ val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = Long.MaxValue),
+ newMetrics, Produce, time, "")
+
+ try {
+ // Set <user> quota config
+ quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(10, true)))
+ checkQuota(quotaManager, "userA", "client1", 10, 1000, true)
+
+ // Remove <user> quota config, back to no quotas
+ quotaManager.updateQuota(Some("userA"), None, None, None)
+ checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+ } finally {
+ quotaManager.shutdown()
+ }
+ }
+
+ @Test
+ def testSetAndRemoveUserClientQuota(): Unit = {
+ // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
+ val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = Long.MaxValue),
+ newMetrics, Produce, time, "")
+
+ try {
+ // Set <user, client-id> quota config
+ quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(10, true)))
+ checkQuota(quotaManager, "userA", "client1", 10, 1000, true)
+
+ // Remove <user, client-id> quota config, back to no quotas
+ quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), None)
+ checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+ } finally {
+ quotaManager.shutdown()
+ }
+ }
+
+ @Test
+ def testQuotaConfigPrecedence(): Unit = {
+ val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
+ newMetrics, Produce, time, "")
try {
quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, Some(new Quota(1000, true)))
@@ -216,47 +276,47 @@ class ClientQuotaManagerTest {
quotaManager.updateQuota(Some("userC"), None, None, Some(new Quota(10000, true)))
quotaManager.updateQuota(None, Some("client1"), Some("client1"), Some(new Quota(9000, true)))
- checkQuota("userA", "client1", 5000, 4500, false) // <user, client> quota takes precedence over <user>
- checkQuota("userA", "client2", 4000, 4500, true) // <user> quota takes precedence over <client> and defaults
- checkQuota("userA", "client3", 4000, 0, true) // <user> quota is shared across clients of user
- checkQuota("userA", "client1", 5000, 0, false) // <user, client> is exclusive use, unaffected by other clients
+ checkQuota(quotaManager, "userA", "client1", 5000, 4500, false) // <user, client> quota takes precedence over <user>
+ checkQuota(quotaManager, "userA", "client2", 4000, 4500, true) // <user> quota takes precedence over <client> and defaults
+ checkQuota(quotaManager, "userA", "client3", 4000, 0, true) // <user> quota is shared across clients of user
+ checkQuota(quotaManager, "userA", "client1", 5000, 0, false) // <user, client> is exclusive use, unaffected by other clients
- checkQuota("userB", "client1", 7000, 8000, true)
- checkQuota("userB", "client2", 8000, 7000, false) // Default per-client quota for exclusive use of <user, client>
- checkQuota("userB", "client3", 8000, 7000, false)
+ checkQuota(quotaManager, "userB", "client1", 7000, 8000, true)
+ checkQuota(quotaManager, "userB", "client2", 8000, 7000, false) // Default per-client quota for exclusive use of <user, client>
+ checkQuota(quotaManager, "userB", "client3", 8000, 7000, false)
- checkQuota("userD", "client1", 3000, 3500, true) // Default <user, client> quota
- checkQuota("userD", "client2", 3000, 2500, false)
- checkQuota("userE", "client1", 3000, 2500, false)
+ checkQuota(quotaManager, "userD", "client1", 3000, 3500, true) // Default <user, client> quota
+ checkQuota(quotaManager, "userD", "client2", 3000, 2500, false)
+ checkQuota(quotaManager, "userE", "client1", 3000, 2500, false)
// Remove default <user, client> quota config, revert to <user> default
quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), None)
- checkQuota("userD", "client1", 1000, 0, false) // Metrics tags changed, restart counter
- checkQuota("userE", "client4", 1000, 1500, true)
- checkQuota("userF", "client4", 1000, 800, false) // Default <user> quota shared across clients of user
- checkQuota("userF", "client5", 1000, 800, true)
+ checkQuota(quotaManager, "userD", "client1", 1000, 0, false) // Metrics tags changed, restart counter
+ checkQuota(quotaManager, "userE", "client4", 1000, 1500, true)
+ checkQuota(quotaManager, "userF", "client4", 1000, 800, false) // Default <user> quota shared across clients of user
+ checkQuota(quotaManager, "userF", "client5", 1000, 800, true)
// Remove default <user> quota config, revert to <client-id> default
quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, None)
- checkQuota("userF", "client4", 2000, 0, false) // Default <client-id> quota shared across client-id of all users
- checkQuota("userF", "client5", 2000, 0, false)
- checkQuota("userF", "client5", 2000, 2500, true)
- checkQuota("userG", "client5", 2000, 0, true)
+ checkQuota(quotaManager, "userF", "client4", 2000, 0, false) // Default <client-id> quota shared across client-id of all users
+ checkQuota(quotaManager, "userF", "client5", 2000, 0, false)
+ checkQuota(quotaManager, "userF", "client5", 2000, 2500, true)
+ checkQuota(quotaManager, "userG", "client5", 2000, 0, true)
// Update quotas
quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(8000, true)))
quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(10000, true)))
- checkQuota("userA", "client2", 8000, 0, false)
- checkQuota("userA", "client2", 8000, 4500, true) // Throttled due to sum of new and earlier values
- checkQuota("userA", "client1", 10000, 0, false)
- checkQuota("userA", "client1", 10000, 6000, true)
+ checkQuota(quotaManager, "userA", "client2", 8000, 0, false)
+ checkQuota(quotaManager, "userA", "client2", 8000, 4500, true) // Throttled due to sum of new and earlier values
+ checkQuota(quotaManager, "userA", "client1", 10000, 0, false)
+ checkQuota(quotaManager, "userA", "client1", 10000, 6000, true)
quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), None)
- checkQuota("userA", "client6", 8000, 0, true) // Throttled due to shared user quota
+ checkQuota(quotaManager, "userA", "client6", 8000, 0, true) // Throttled due to shared user quota
quotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), Some(new Quota(11000, true)))
- checkQuota("userA", "client6", 11000, 8500, false)
+ checkQuota(quotaManager, "userA", "client6", 11000, 8500, false)
quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(12000, true)))
quotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), None)
- checkQuota("userA", "client6", 12000, 4000, true) // Throttled due to sum of new and earlier values
+ checkQuota(quotaManager, "userA", "client6", 12000, 4000, true) // Throttled due to sum of new and earlier values
} finally {
quotaManager.shutdown()