You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/07 11:28:43 UTC

kafka git commit: KAFKA-4267; Quota initialization for uses incorrect ZK path

Repository: kafka
Updated Branches:
  refs/heads/trunk 3c8925946 -> 8b75a016d


KAFKA-4267; Quota initialization for <user, clientId> uses incorrect ZK path

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1987 from rajinisivaram/quota-init-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b75a016
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b75a016
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b75a016

Branch: refs/heads/trunk
Commit: 8b75a016db16e20c5d5180deb6859bf0ad4c48fd
Parents: 3c89259
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Fri Oct 7 12:10:53 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Oct 7 12:10:53 2016 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  2 +-
 .../kafka/server/DynamicConfigChangeTest.scala  | 31 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b75a016/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 96f09b0..aa38f69 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -608,7 +608,7 @@ object AdminUtils extends Logging with AdminUtilities {
   def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
     def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
       val root = rootPath match {
-        case Some(path) => rootEntityType + '/' + rootPath
+        case Some(path) => rootEntityType + '/' + path
         case None => rootEntityType
       }
       val entityNames = zkUtils.getAllEntitiesWithConfig(root)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b75a016/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 4b44b1f..faa23f0 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -129,6 +129,37 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
   }
 
   @Test
+  def testQuotaInitialization() {
+    val server = servers.head
+    val clientIdProps = new Properties()
+    server.shutdown()
+    clientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000")
+    clientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000")
+    val userProps = new Properties()
+    userProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "10000")
+    userProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "20000")
+    val userClientIdProps = new Properties()
+    userClientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "100000")
+    userClientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "200000")
+
+    AdminUtils.changeClientIdConfig(zkUtils, "overriddenClientId", clientIdProps)
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "overriddenUser", userProps)
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
+
+    // Remove config change znodes to force quota initialization only through loading of user/client quotas
+    zkUtils.getChildren(ZkUtils.EntityConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.EntityConfigChangesPath + "/" + p) }
+    server.startup()
+    val quotaManagers = server.apis.quotas
+
+    assertEquals(Quota.upperBound(1000),  quotaManagers.produce.quota("someuser", "overriddenClientId"))
+    assertEquals(Quota.upperBound(2000),  quotaManagers.fetch.quota("someuser", "overriddenClientId"))
+    assertEquals(Quota.upperBound(10000),  quotaManagers.produce.quota("overriddenUser", "someclientId"))
+    assertEquals(Quota.upperBound(20000),  quotaManagers.fetch.quota("overriddenUser", "someclientId"))
+    assertEquals(Quota.upperBound(100000),  quotaManagers.produce.quota("ANONYMOUS", "overriddenUserClientId"))
+    assertEquals(Quota.upperBound(200000),  quotaManagers.fetch.quota("ANONYMOUS", "overriddenUserClientId"))
+  }
+
+  @Test
   def testConfigChangeOnNonExistingTopic() {
     val topic = TestUtils.tempTopic
     try {