You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/10 23:07:48 UTC

[kafka] branch 2.3 updated: KAFKA-9658; Fix user quota removal (#8232)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 2b26c23  KAFKA-9658; Fix user quota removal (#8232)
2b26c23 is described below

commit 2b26c23d36ef64a2a40044c2b7e50056ec924db8
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Tue Mar 10 12:30:58 2020 -0700

    KAFKA-9658; Fix user quota removal (#8232)
    
    Adding (add-config) default user, user, or <user, client-id> quota and then removing it via delete-config does not update quota bound in ClientQuotaManager.Metrics for existing users or <user,client-id>. This causes brokers to continue to throttle with the previously set quotas until brokers restart (or <user,client> stops sending traffic for sometime and sensor expires). This happens only when removing the user or user,client-id where there are no more quotas  to fall back to. Common [...]
    
    The cause of the issue was `DefaultQuotaCallback.quotaLimit` was returning `null` when no default user quota set, which caused `ClientQuotaManager.updateQuotaMetricConfigs` to skip updating the appropriate sensor, which left it unchanged with the previous quota. Since `null` is an acceptable return value for `ClientQuotaCallback.quotaLimit`, which is already treated as unlimited quota in other parts of the code, this PR ensures that `ClientQuotaManager.updateQuotaMetricConfigs` update [...]
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
 .../scala/kafka/server/ClientQuotaManager.scala    |   5 +-
 .../unit/kafka/server/ClientQuotaManagerTest.scala | 132 +++++++++++++++------
 2 files changed, 98 insertions(+), 39 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 9817f21..87e4923 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -483,7 +483,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       // Change the underlying metric config if the sensor has been created
       val metric = allMetrics.get(quotaMetricName)
       if (metric != null) {
-        Option(quotaCallback.quotaLimit(clientQuotaType, metricTags.asJava)).foreach { newQuota =>
+        Option(quotaLimit(metricTags.asJava)).foreach { newQuota =>
           info(s"Sensor for $quotaEntity already exists. Changing quota to $newQuota in MetricConfig")
           metric.config(getQuotaMetricConfig(newQuota))
         }
@@ -493,8 +493,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
       allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
         case (metricName, metric) =>
           val metricTags = metricName.tags
-          Option(quotaCallback.quotaLimit(clientQuotaType, metricTags)).foreach { quota =>
-            val newQuota = quota.asInstanceOf[Double]
+          Option(quotaLimit(metricTags)).foreach { newQuota =>
             if (newQuota != metric.config.quota.bound) {
               info(s"Sensor for quota-id $metricTags already exists. Setting quota to $newQuota in MetricConfig")
               metric.config(getQuotaMetricConfig(newQuota))
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index e10d4b2..6598d3e 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -190,19 +190,79 @@ class ClientQuotaManagerTest {
     testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
   }
 
+  private def checkQuota(quotaManager: ClientQuotaManager, user: String, clientId: String, expectedBound: Long, value: Int, expectThrottle: Boolean): Unit = {
+    assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
+    val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * config.numQuotaSamples)
+    if (expectThrottle)
+      assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
+    else
+      assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
+  }
+
   @Test
-  def testQuotaConfigPrecedence() {
-    val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
-        newMetrics, Produce, time, "")
-
-    def checkQuota(user: String, clientId: String, expectedBound: Int, value: Int, expectThrottle: Boolean) {
-      assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
-      val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * config.numQuotaSamples)
-      if (expectThrottle)
-        assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
-      else
-        assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
+  def testSetAndRemoveDefaultUserQuota(): Unit = {
+    // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
+    val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = Long.MaxValue),
+      newMetrics, Produce, time, "")
+
+    try {
+      // no quota set yet, should not throttle
+      checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+
+      // Set default <user> quota config
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, Some(new Quota(10, true)))
+      checkQuota(quotaManager, "userA", "client1", 10, 1000, true)
+
+      // Remove default <user> quota config, back to no quotas
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, None)
+      checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+    } finally {
+      quotaManager.shutdown()
     }
+  }
+
+  @Test
+  def testSetAndRemoveUserQuota(): Unit = {
+    // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
+    val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = Long.MaxValue),
+      newMetrics, Produce, time, "")
+
+    try {
+      // Set <user> quota config
+      quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(10, true)))
+      checkQuota(quotaManager, "userA", "client1", 10, 1000, true)
+
+      // Remove <user> quota config, back to no quotas
+      quotaManager.updateQuota(Some("userA"), None, None, None)
+      checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+    } finally {
+      quotaManager.shutdown()
+    }
+  }
+
+  @Test
+  def testSetAndRemoveUserClientQuota(): Unit = {
+    // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
+    val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = Long.MaxValue),
+      newMetrics, Produce, time, "")
+
+    try {
+      // Set <user, client-id> quota config
+      quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(10, true)))
+      checkQuota(quotaManager, "userA", "client1", 10, 1000, true)
+
+      // Remove <user, client-id> quota config, back to no quotas
+      quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), None)
+      checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+    } finally {
+      quotaManager.shutdown()
+    }
+  }
+
+  @Test
+  def testQuotaConfigPrecedence(): Unit = {
+    val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
+      newMetrics, Produce, time, "")
 
     try {
       quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, Some(new Quota(1000, true)))
@@ -216,47 +276,47 @@ class ClientQuotaManagerTest {
       quotaManager.updateQuota(Some("userC"), None, None, Some(new Quota(10000, true)))
       quotaManager.updateQuota(None, Some("client1"), Some("client1"), Some(new Quota(9000, true)))
 
-      checkQuota("userA", "client1", 5000, 4500, false) // <user, client> quota takes precedence over <user>
-      checkQuota("userA", "client2", 4000, 4500, true)  // <user> quota takes precedence over <client> and defaults
-      checkQuota("userA", "client3", 4000, 0, true)     // <user> quota is shared across clients of user
-      checkQuota("userA", "client1", 5000, 0, false)    // <user, client> is exclusive use, unaffected by other clients
+      checkQuota(quotaManager, "userA", "client1", 5000, 4500, false) // <user, client> quota takes precedence over <user>
+      checkQuota(quotaManager, "userA", "client2", 4000, 4500, true)  // <user> quota takes precedence over <client> and defaults
+      checkQuota(quotaManager, "userA", "client3", 4000, 0, true)     // <user> quota is shared across clients of user
+      checkQuota(quotaManager, "userA", "client1", 5000, 0, false)    // <user, client> is exclusive use, unaffected by other clients
 
-      checkQuota("userB", "client1", 7000, 8000, true)
-      checkQuota("userB", "client2", 8000, 7000, false) // Default per-client quota for exclusive use of <user, client>
-      checkQuota("userB", "client3", 8000, 7000, false)
+      checkQuota(quotaManager, "userB", "client1", 7000, 8000, true)
+      checkQuota(quotaManager, "userB", "client2", 8000, 7000, false) // Default per-client quota for exclusive use of <user, client>
+      checkQuota(quotaManager, "userB", "client3", 8000, 7000, false)
 
-      checkQuota("userD", "client1", 3000, 3500, true)  // Default <user, client> quota
-      checkQuota("userD", "client2", 3000, 2500, false)
-      checkQuota("userE", "client1", 3000, 2500, false)
+      checkQuota(quotaManager, "userD", "client1", 3000, 3500, true)  // Default <user, client> quota
+      checkQuota(quotaManager, "userD", "client2", 3000, 2500, false)
+      checkQuota(quotaManager, "userE", "client1", 3000, 2500, false)
 
       // Remove default <user, client> quota config, revert to <user> default
       quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), None)
-      checkQuota("userD", "client1", 1000, 0, false)    // Metrics tags changed, restart counter
-      checkQuota("userE", "client4", 1000, 1500, true)
-      checkQuota("userF", "client4", 1000, 800, false)  // Default <user> quota shared across clients of user
-      checkQuota("userF", "client5", 1000, 800, true)
+      checkQuota(quotaManager, "userD", "client1", 1000, 0, false)    // Metrics tags changed, restart counter
+      checkQuota(quotaManager, "userE", "client4", 1000, 1500, true)
+      checkQuota(quotaManager, "userF", "client4", 1000, 800, false)  // Default <user> quota shared across clients of user
+      checkQuota(quotaManager, "userF", "client5", 1000, 800, true)
 
       // Remove default <user> quota config, revert to <client-id> default
       quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, None)
-      checkQuota("userF", "client4", 2000, 0, false)  // Default <client-id> quota shared across client-id of all users
-      checkQuota("userF", "client5", 2000, 0, false)
-      checkQuota("userF", "client5", 2000, 2500, true)
-      checkQuota("userG", "client5", 2000, 0, true)
+      checkQuota(quotaManager, "userF", "client4", 2000, 0, false)  // Default <client-id> quota shared across client-id of all users
+      checkQuota(quotaManager, "userF", "client5", 2000, 0, false)
+      checkQuota(quotaManager, "userF", "client5", 2000, 2500, true)
+      checkQuota(quotaManager, "userG", "client5", 2000, 0, true)
 
       // Update quotas
       quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(8000, true)))
       quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(10000, true)))
-      checkQuota("userA", "client2", 8000, 0, false)
-      checkQuota("userA", "client2", 8000, 4500, true) // Throttled due to sum of new and earlier values
-      checkQuota("userA", "client1", 10000, 0, false)
-      checkQuota("userA", "client1", 10000, 6000, true)
+      checkQuota(quotaManager, "userA", "client2", 8000, 0, false)
+      checkQuota(quotaManager, "userA", "client2", 8000, 4500, true) // Throttled due to sum of new and earlier values
+      checkQuota(quotaManager, "userA", "client1", 10000, 0, false)
+      checkQuota(quotaManager, "userA", "client1", 10000, 6000, true)
       quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), None)
-      checkQuota("userA", "client6", 8000, 0, true)    // Throttled due to shared user quota
+      checkQuota(quotaManager, "userA", "client6", 8000, 0, true)    // Throttled due to shared user quota
       quotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), Some(new Quota(11000, true)))
-      checkQuota("userA", "client6", 11000, 8500, false)
+      checkQuota(quotaManager, "userA", "client6", 11000, 8500, false)
       quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(12000, true)))
       quotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), None)
-      checkQuota("userA", "client6", 12000, 4000, true) // Throttled due to sum of new and earlier values
+      checkQuota(quotaManager, "userA", "client6", 12000, 4000, true) // Throttled due to sum of new and earlier values
 
     } finally {
       quotaManager.shutdown()