You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/10/07 11:28:43 UTC
kafka git commit: KAFKA-4267;
Quota initialization for uses incorrect ZK path
Repository: kafka
Updated Branches:
refs/heads/trunk 3c8925946 -> 8b75a016d
KAFKA-4267; Quota initialization for <user, clientId> uses incorrect ZK path
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1987 from rajinisivaram/quota-init-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b75a016
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b75a016
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b75a016
Branch: refs/heads/trunk
Commit: 8b75a016db16e20c5d5180deb6859bf0ad4c48fd
Parents: 3c89259
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Fri Oct 7 12:10:53 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Oct 7 12:10:53 2016 +0100
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 2 +-
.../kafka/server/DynamicConfigChangeTest.scala | 31 ++++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b75a016/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 96f09b0..aa38f69 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -608,7 +608,7 @@ object AdminUtils extends Logging with AdminUtilities {
def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
val root = rootPath match {
- case Some(path) => rootEntityType + '/' + rootPath
+ case Some(path) => rootEntityType + '/' + path
case None => rootEntityType
}
val entityNames = zkUtils.getAllEntitiesWithConfig(root)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b75a016/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 4b44b1f..faa23f0 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -129,6 +129,37 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
@Test
+ def testQuotaInitialization() {
+ val server = servers.head
+ val clientIdProps = new Properties()
+ server.shutdown()
+ clientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000")
+ clientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000")
+ val userProps = new Properties()
+ userProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "10000")
+ userProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "20000")
+ val userClientIdProps = new Properties()
+ userClientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "100000")
+ userClientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "200000")
+
+ AdminUtils.changeClientIdConfig(zkUtils, "overriddenClientId", clientIdProps)
+ AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "overriddenUser", userProps)
+ AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
+
+ // Remove config change znodes to force quota initialization only through loading of user/client quotas
+ zkUtils.getChildren(ZkUtils.EntityConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.EntityConfigChangesPath + "/" + p) }
+ server.startup()
+ val quotaManagers = server.apis.quotas
+
+ assertEquals(Quota.upperBound(1000), quotaManagers.produce.quota("someuser", "overriddenClientId"))
+ assertEquals(Quota.upperBound(2000), quotaManagers.fetch.quota("someuser", "overriddenClientId"))
+ assertEquals(Quota.upperBound(10000), quotaManagers.produce.quota("overriddenUser", "someclientId"))
+ assertEquals(Quota.upperBound(20000), quotaManagers.fetch.quota("overriddenUser", "someclientId"))
+ assertEquals(Quota.upperBound(100000), quotaManagers.produce.quota("ANONYMOUS", "overriddenUserClientId"))
+ assertEquals(Quota.upperBound(200000), quotaManagers.fetch.quota("ANONYMOUS", "overriddenUserClientId"))
+ }
+
+ @Test
def testConfigChangeOnNonExistingTopic() {
val topic = TestUtils.tempTopic
try {