You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/11/22 21:25:58 UTC
[2/3] kafka git commit: KAFKA-5646;
Use KafkaZkClient in DynamicConfigManager and AdminManager
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 47d487e..3f51528 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -32,7 +32,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAw
"--replication-factor", replicationFactor.toString,
"--disable-rack-aware",
"--topic", "foo"))
- kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
+ kafka.admin.TopicCommand.createTopic(zkClient, createOpts)
val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 522fcd3..b701f13 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,7 +19,6 @@ import java.util.regex.Pattern
import java.util.{ArrayList, Collections, Properties}
import kafka.admin.AdminClient
-import kafka.admin.AdminUtils
import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
import kafka.common.TopicAndPartition
@@ -447,9 +446,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
*/
@Test
def testAuthorizationWithTopicNotExisting() {
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
- AdminUtils.deleteTopic(zkUtils, deleteTopic)
+ adminZkClient.deleteTopic(deleteTopic)
TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index eadb488..0badda9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -22,7 +22,6 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import collection.JavaConverters._
-import kafka.admin.AdminUtils
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.KafkaConfig
@@ -375,7 +374,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
- AdminUtils.addPartitions(zkUtils, topic, existingAssignment, AdminUtils.getBrokerMetadatas(zkUtils), 2)
+ adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), 2)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index 383f139..3e08327 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -16,7 +16,6 @@ package kafka.api
import java.util.Properties
-import kafka.admin.AdminUtils
import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Sanitizer
@@ -54,6 +53,6 @@ class ClientIdQuotaTest extends BaseQuotaTest {
}
private def updateQuotaOverride(clientId: String, properties: Properties) {
- AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(clientId), properties)
+ adminZkClient.changeClientIdConfig(Sanitizer.sanitize(clientId), properties)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index cb5262d..0ae9d17 100644
--- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -18,7 +18,7 @@ package kafka.api
import java.util.Properties
-import kafka.admin.{RackAwareMode, AdminUtils, RackAwareTest}
+import kafka.admin.{RackAwareMode, RackAwareTest}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
@@ -55,7 +55,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa
val assignment = zkUtils.getReplicaAssignmentForTopics(Seq(topic)).map { case (topicPartition, replicas) =>
topicPartition.partition -> replicas
}
- val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
+ val brokerMetadatas = adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced)
val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap)
checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index e25f886..453ac91 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -17,7 +17,6 @@ package kafka.api
import java.io.File
import java.util.Properties
-import kafka.admin.AdminUtils
import kafka.server._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Sanitizer
@@ -41,7 +40,7 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
super.setUp()
val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
+ adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
}
@@ -59,11 +58,11 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
override def removeQuotaOverrides() {
val emptyProps = new Properties
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
+ adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
+ adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
}
private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) {
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
+ adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index b5d88c0..91a92fa 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -17,7 +17,6 @@ package kafka.api
import java.io.File
import java.util.Properties
-import kafka.admin.AdminUtils
import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -45,7 +44,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
super.setUp()
val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default, defaultProps)
+ adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default, defaultProps)
waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
}
@@ -67,6 +66,6 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
}
private def updateQuotaOverride(properties: Properties) {
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal), properties)
+ adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal), properties)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 075b1af..10c7737 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -19,7 +19,6 @@ package kafka.server
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.admin.AdminUtils
import kafka.cluster.BrokerEndPoint
import kafka.server.ReplicaFetcherThread.{FetchRequest, PartitionData}
import kafka.utils.{Exit, TestUtils, ZkUtils}
@@ -60,7 +59,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
// Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
// the metadata is propagated.
def createTopic(zkUtils: ZkUtils, topic: String): Unit = {
- AdminUtils.createTopic(zkUtils, topic, partitions = 1, replicationFactor = 2)
+ adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 79fc68f..d79ba32 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -73,7 +73,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testWrongReplicaCount(): Unit = {
try {
- AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+ adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 2,
Some(Map(0 -> Seq(0, 1), 1 -> Seq(0, 1, 2))))
fail("Add partitions should fail")
} catch {
@@ -84,7 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testMissingPartition0(): Unit = {
try {
- AdminUtils.addPartitions(zkUtils, topic5, topic5Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+ adminZkClient.addPartitions(topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2))))
fail("Add partitions should fail")
} catch {
@@ -95,7 +95,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testIncrementPartitions(): Unit = {
- AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
+ adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 3)
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
@@ -123,7 +123,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testManualAssignmentOfReplicas(): Unit = {
// Add 2 partitions
- AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3,
+ adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3,
Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
@@ -152,7 +152,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testReplicaPlacementAllServers(): Unit = {
- AdminUtils.addPartitions(zkUtils, topic3, topic3Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 7)
+ adminZkClient.addPartitions(topic3, topic3Assignment, adminZkClient.getBrokerMetadatas(), 7)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
@@ -179,7 +179,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testReplicaPlacementPartialServers(): Unit = {
- AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
+ adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 87ce46e..acac907 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -21,8 +21,8 @@ import java.util.Properties
import kafka.admin.ConfigCommand.ConfigCommandOptions
import kafka.common.InvalidConfigException
import kafka.server.ConfigEntityName
-import kafka.utils.{Logging, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.Logging
+import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.common.security.scram.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
import org.easymock.EasyMock
@@ -95,7 +95,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
def shouldFailIfUnrecognisedEntityType(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test
@@ -106,14 +106,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=d"))
- val configChange = new TestAdminUtils {
- override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configChange: Properties): Unit = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeClientIdConfig(clientId: String, configChange: Properties): Unit = {
assertEquals("my-client-id", clientId)
assertEquals("b", configChange.get("a"))
assertEquals("d", configChange.get("c"))
}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test
@@ -124,14 +125,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=d"))
- val configChange = new TestAdminUtils {
- override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties): Unit = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
assertEquals("my-topic", topic)
assertEquals("b", configChange.get("a"))
assertEquals("d", configChange.get("c"))
}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test
@@ -142,14 +144,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=d"))
- val configChange = new TestAdminUtils {
- override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
assertEquals(Seq(1), brokerIds)
assertEquals("b", configChange.get("a"))
assertEquals("d", configChange.get("c"))
}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test
@@ -160,15 +163,18 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=[d,e ,f],g=[h,i]"))
- val configChange = new TestAdminUtils {
- override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
assertEquals(Seq(1), brokerIds)
assertEquals("b", configChange.get("a"))
assertEquals("d,e ,f", configChange.get("c"))
assertEquals("h,i", configChange.get("g"))
}
+
+ override def changeTopicConfig(topic: String, configs: Properties): Unit = {}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test (expected = classOf[IllegalArgumentException])
@@ -178,7 +184,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-type", "brokers",
"--alter",
"--add-config", "a=b"))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test (expected = classOf[IllegalArgumentException])
@@ -188,7 +194,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-type", "brokers",
"--alter",
"--add-config", "a="))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test (expected = classOf[IllegalArgumentException])
@@ -198,7 +204,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-type", "brokers",
"--alter",
"--add-config", "a=[b,c,d=e"))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test (expected = classOf[InvalidConfigException])
@@ -208,7 +214,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-type", "topics",
"--alter",
"--delete-config", "missing_config1, missing_config2"))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test
@@ -219,8 +225,8 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--delete-config", "a,c"))
- val configChange = new TestAdminUtils {
- override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
val properties: Properties = new Properties
properties.put("a", "b")
properties.put("c", "d")
@@ -228,12 +234,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
properties
}
- override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+ override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
assertEquals("f", configChange.get("e"))
assertEquals(1, configChange.size())
}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test
@@ -253,11 +260,11 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--delete-config", mechanism))
val credentials = mutable.Map[String, Properties]()
- case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends TestAdminUtils {
- override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+ case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends AdminZkClient(zkClient) {
+ override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
credentials.getOrElse(entityName, new Properties())
}
- override def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configChange: Properties): Unit = {
+ override def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configChange: Properties): Unit = {
assertEquals(user, sanitizedEntityName)
assertEquals(mechanisms, configChange.keySet().asScala)
for (mechanism <- mechanisms) {
@@ -397,17 +404,17 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@Test
def testQuotaDescribeEntities() {
- val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+ val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
def checkEntities(opts: Array[String], expectedFetches: Map[String, Seq[String]], expectedEntityNames: Seq[String]) {
val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ "--describe"))
expectedFetches.foreach {
- case (name, values) => EasyMock.expect(zkUtils.getAllEntitiesWithConfig(name)).andReturn(values)
+ case (name, values) => EasyMock.expect(zkClient.getAllEntitiesWithConfig(name)).andReturn(values)
}
- EasyMock.replay(zkUtils)
- val entities = entity.getAllEntities(zkUtils)
+ EasyMock.replay(zkClient)
+ val entities = entity.getAllEntities(zkClient)
assertEquals(expectedEntityNames, entities.map(e => e.fullSanitizedName))
- EasyMock.reset(zkUtils)
+ EasyMock.reset(zkClient)
}
val clientId = "a-client"
@@ -456,4 +463,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
Map("users" -> Seq("<default>", sanitizedPrincipal)) ++ defaultUserMap ++ userMap,
Seq("<default>/clients/client-3", sanitizedPrincipal + "/clients/client-2"))
}
+
+ case class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): Unit = {}
+ override def fetchEntityConfig(entityType: String, entityName: String): Properties = {new Properties}
+ override def changeClientIdConfig(clientId: String, configs: Properties): Unit = {}
+ override def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties): Unit = {}
+ override def changeTopicConfig(topic: String, configs: Properties): Unit = {}
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 78f022a..1922aaf 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -46,7 +46,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topic = "test"
servers = createTestTopicAndCluster(topic)
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
}
@@ -61,7 +61,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
follower.shutdown()
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// check if all replicas but the one that is shut down has deleted the log
TestUtils.waitUntilTrue(() =>
servers.filter(s => s.config.brokerId != follower.config.brokerId)
@@ -86,7 +86,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
follower.shutdown()
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// shut down the controller to trigger controller failover during delete topic
controller.shutdown()
@@ -112,7 +112,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
this.servers = allServers
val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
// wait until replica log is created on every broker
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
"Replicas for topic test not created.")
@@ -121,7 +121,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
follower.shutdown()
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
// the topic is being deleted
// reassign partition 0
@@ -155,16 +155,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val follower = servers.filter(_.config.brokerId != leaderIdOpt.get).last
val newPartition = new TopicPartition(topic, 1)
// capture the brokers before we shutdown so that we don't fail validation in `addPartitions`
- val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
+ val brokers = adminZkClient.getBrokerMetadatas()
follower.shutdown()
// wait until the broker has been removed from ZK to reduce non-determinism
TestUtils.waitUntilTrue(() => zkUtils.getBrokerInfo(follower.config.brokerId).isEmpty,
s"Follower ${follower.config.brokerId} was not removed from ZK")
// add partitions to topic
- AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
+ adminZkClient.addPartitions(topic, expectedReplicaAssignment, brokers, 2,
Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
follower.startup()
// test if topic deletion is resumed
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -178,12 +178,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
def testAddPartitionDuringDeleteTopic() {
val topic = "test"
servers = createTestTopicAndCluster(topic)
- val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
+ val brokers = adminZkClient.getBrokerMetadatas()
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// add partitions to topic
val newPartition = new TopicPartition(topic, 1)
- AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
+ adminZkClient.addPartitions(topic, expectedReplicaAssignment, brokers, 2,
Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
// verify that new partition doesn't exist on any broker either
@@ -198,10 +198,10 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, 0)
servers = createTestTopicAndCluster(topic)
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
// re-create topic on same replicas
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
// wait until leader is elected
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
// check if all replica logs are created
@@ -216,7 +216,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
servers = createTestTopicAndCluster(topic)
// start topic deletion
try {
- AdminUtils.deleteTopic(zkUtils, "test2")
+ adminZkClient.deleteTopic("test2")
fail("Expected UnknownTopicOrPartitionException")
} catch {
case _: UnknownTopicOrPartitionException => // expected exception
@@ -258,7 +258,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
// delete topic
- AdminUtils.deleteTopic(zkUtils, "test")
+ adminZkClient.deleteTopic("test")
TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers)
}
@@ -270,9 +270,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
try {
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// try to delete topic marked as deleted
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
fail("Expected TopicAlreadyMarkedForDeletionException")
}
catch {
@@ -293,7 +293,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// create brokers
val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
// wait until replica log is created on every broker
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
"Replicas for topic test not created")
@@ -316,7 +316,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topic = topicPartition.topic
servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
// mark the topic for deletion
- AdminUtils.deleteTopic(zkUtils, "test")
+ adminZkClient.deleteTopic("test")
TestUtils.waitUntilTrue(() => !zkUtils.isTopicMarkedForDeletion(topic),
"Admin path /admin/delete_topic/%s path not deleted even if deleteTopic is disabled".format(topic))
// verify that topic test is untouched
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 7000308..e367372 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -56,7 +56,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
@Before
override def setUp() {
super.setUp()
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
}
@After
@@ -234,7 +234,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
def testDescribeWithMultiPartitionTopicAndMultipleConsumersWithNewConsumer() {
TestUtils.createOffsetsTopic(zkUtils, servers)
val topic2 = "foo2"
- AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+ adminZkClient.createTopic(topic2, 2, 1)
// run two consumers in the group consuming from a two-partition topic
consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 6727fad..4b22898 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -30,7 +30,6 @@ import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
-
class ListConsumerGroupTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
@@ -47,7 +46,7 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
override def setUp() {
super.setUp()
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
props.setProperty("group.id", group)
props.setProperty("zookeeper.connect", zkConnect)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 6853b16..8227487 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -88,7 +88,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = new KafkaConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
@@ -98,13 +98,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 1 (specific offset).")
printConsumerGroup("new.group")
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
consumerGroupCommand.close()
}
@Test
def testResetOffsetsToLocalDateTime() {
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
val calendar = Calendar.getInstance()
@@ -144,12 +144,12 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
def testResetOffsetsToZonedDateTime() {
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
@@ -188,7 +188,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -230,7 +230,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -241,7 +241,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -250,7 +250,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -260,7 +260,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 100 (latest by duration).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -269,7 +269,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -279,7 +279,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -288,7 +288,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -301,7 +301,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 200 (latest).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -310,7 +310,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -322,7 +322,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 100 (current).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) {
@@ -351,7 +351,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -362,7 +362,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 1 (specific offset).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -371,7 +371,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -383,7 +383,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 150 (current + 50).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -392,7 +392,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -405,7 +405,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 50 (current - 50).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -414,7 +414,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -426,7 +426,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest by shift).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -435,7 +435,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -447,7 +447,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 200 (latest by shift).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -456,7 +456,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -466,7 +466,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -475,7 +475,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+ adminZkClient.createTopic(topic1, 2, 1)
produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
@@ -485,7 +485,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -498,8 +498,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
- AdminUtils.createTopic(zkUtils, topic2, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
+ adminZkClient.createTopic(topic2, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic2, 100)
@@ -511,8 +511,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
- AdminUtils.deleteTopic(zkUtils, topic2)
+ adminZkClient.deleteTopic(topic1)
+ adminZkClient.deleteTopic(topic2)
}
@Test
@@ -525,8 +525,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 2, 1)
- AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+ adminZkClient.createTopic(topic1, 2, 1)
+ adminZkClient.createTopic(topic2, 2, 1)
produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
produceConsumeAndShutdown(consumerGroupCommand, 2, topic2, 100)
@@ -538,8 +538,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
- AdminUtils.deleteTopic(zkUtils, topic2)
+ adminZkClient.deleteTopic(topic1)
+ adminZkClient.deleteTopic(topic2)
}
@Test
@@ -548,7 +548,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+ adminZkClient.createTopic(topic1, 2, 1)
produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
@@ -575,7 +575,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
file.deleteOnExit()
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
private def printConsumerGroup() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 53efa34..180f257 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -44,8 +44,8 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
"--replication-factor", "1",
"--config", cleanupKey + "=" + cleanupVal,
"--topic", topic))
- TopicCommand.createTopic(zkUtils, createOpts)
- val props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+ TopicCommand.createTopic(zkClient, createOpts)
+ val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
@@ -55,8 +55,8 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// modify the topic to add new partitions
val numPartitionsModified = 3
val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic))
- TopicCommand.alterTopic(zkUtils, alterOpts)
- val newProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+ TopicCommand.alterTopic(zkClient, alterOpts)
+ val newProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
}
@@ -75,27 +75,27 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--topic", normalTopic))
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
val deletePath = getDeleteTopicPath(normalTopic)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deletePath))
- TopicCommand.deleteTopic(zkUtils, deleteOpts)
+ TopicCommand.deleteTopic(zkClient, deleteOpts)
assertTrue("Delete path for topic should exist after deletion.", zkUtils.pathExists(deletePath))
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--topic", Topic.GROUP_METADATA_TOPIC_NAME))
- TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
+ TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
// try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
- TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
+ TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
}
assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
}
@@ -109,12 +109,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// delete a topic that does not exist without --if-exists
val deleteOpts = new TopicCommandOptions(Array("--topic", "test"))
intercept[IllegalArgumentException] {
- TopicCommand.deleteTopic(zkUtils, deleteOpts)
+ TopicCommand.deleteTopic(zkClient, deleteOpts)
}
// delete a topic that does not exist with --if-exists
val deleteExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--if-exists"))
- TopicCommand.deleteTopic(zkUtils, deleteExistsOpts)
+ TopicCommand.deleteTopic(zkClient, deleteExistsOpts)
}
@Test
@@ -126,12 +126,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// alter a topic that does not exist without --if-exists
val alterOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions", "1"))
intercept[IllegalArgumentException] {
- TopicCommand.alterTopic(zkUtils, alterOpts)
+ TopicCommand.alterTopic(zkClient, alterOpts)
}
// alter a topic that does not exist with --if-exists
val alterExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions", "1", "--if-exists"))
- TopicCommand.alterTopic(zkUtils, alterExistsOpts)
+ TopicCommand.alterTopic(zkClient, alterExistsOpts)
}
@Test
@@ -146,17 +146,17 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// create the topic
val createOpts = new TopicCommandOptions(
Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic))
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
// try to re-create the topic without --if-not-exists
intercept[TopicExistsException] {
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
}
// try to re-create the topic with --if-not-exists
val createNotExistsOpts = new TopicCommandOptions(
Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic, "--if-not-exists"))
- TopicCommand.createTopic(zkUtils, createNotExistsOpts)
+ TopicCommand.createTopic(zkClient, createNotExistsOpts)
}
@Test
@@ -170,7 +170,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
"--partitions", numPartitions.toString,
"--replication-factor", replicationFactor.toString,
"--topic", "foo"))
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
var assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
tp.partition -> replicas
@@ -182,7 +182,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
val alterOpts = new TopicCommandOptions(Array(
"--partitions", alteredNumPartitions.toString,
"--topic", "foo"))
- TopicCommand.alterTopic(zkUtils, alterOpts)
+ TopicCommand.alterTopic(zkClient, alterOpts)
assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
tp.partition -> replicas
}
@@ -198,28 +198,28 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
TestUtils.createBrokersInZk(zkUtils, brokers)
val createOpts = new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", topic))
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
// delete the broker first, so when we attempt to delete the topic it gets into "marked for deletion"
TestUtils.deleteBrokersInZk(zkUtils, brokers)
- TopicCommand.deleteTopic(zkUtils, new TopicCommandOptions(Array("--topic", topic)))
+ TopicCommand.deleteTopic(zkClient, new TopicCommandOptions(Array("--topic", topic)))
// Test describe topics
def describeTopicsWithConfig() {
- TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe")))
+ TopicCommand.describeTopic(zkClient, new TopicCommandOptions(Array("--describe")))
}
val outputWithConfig = TestUtils.grabConsoleOutput(describeTopicsWithConfig)
assertTrue(outputWithConfig.contains(topic) && outputWithConfig.contains(markedForDeletionDescribe))
def describeTopicsNoConfig() {
- TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe", "--unavailable-partitions")))
+ TopicCommand.describeTopic(zkClient, new TopicCommandOptions(Array("--describe", "--unavailable-partitions")))
}
val outputNoConfig = TestUtils.grabConsoleOutput(describeTopicsNoConfig)
assertTrue(outputNoConfig.contains(topic) && outputNoConfig.contains(markedForDeletionDescribe))
// Test list topics
def listTopics() {
- TopicCommand.listTopics(zkUtils, new TopicCommandOptions(Array("--list")))
+ TopicCommand.listTopics(zkClient, new TopicCommandOptions(Array("--list")))
}
val output = TestUtils.grabConsoleOutput(listTopics)
assertTrue(output.contains(topic) && output.contains(markedForDeletionList))
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 04f9bea..32e23cc 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -20,7 +20,6 @@ package kafka.controller
import java.util.Properties
import java.util.concurrent.CountDownLatch
-import kafka.admin.AdminUtils
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils._
@@ -60,7 +59,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
}
val initialEpoch = initialController.epoch
// Create topic with one partition
- AdminUtils.createTopic(servers.head.zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
val topicPartition = new TopicPartition("topic1", 0)
TestUtils.waitUntilTrue(() =>
initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index bec5026..daa4276 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -26,7 +26,6 @@ import org.junit.{Before, Test}
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Gauge
-
class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with Logging {
private val nodesNum = 3
@@ -136,7 +135,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
// Create topics
for (t <- topics if running) {
try {
- kafka.admin.AdminUtils.createTopic(zkUtils, t, partitionNum, replicationFactor)
+ adminZkClient.createTopic(t, partitionNum, replicationFactor)
} catch {
case e: Exception => e.printStackTrace
}
@@ -146,7 +145,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
// Delete topics
for (t <- topics if running) {
try {
- kafka.admin.AdminUtils.deleteTopic(zkUtils, t)
+ adminZkClient.deleteTopic(t)
} catch {
case e: Exception => e.printStackTrace
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 6e9e8ff..e93dcf6 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,7 +17,6 @@
package kafka.integration
-import kafka.admin.AdminUtils
import kafka.api.TopicMetadataResponse
import kafka.client.ClientUtils
import kafka.cluster.BrokerEndPoint
@@ -219,7 +218,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
// create topic
val topic: String = "test"
- AdminUtils.createTopic(zkUtils, topic, 1, numBrokers)
+ adminZkClient.createTopic(topic, 1, numBrokers)
// shutdown a broker
adHocServers.last.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 24421d0..7732e38 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -25,7 +25,6 @@ import org.apache.log4j.{Level, Logger}
import java.util.Properties
import java.util.concurrent.ExecutionException
-import kafka.admin.AdminUtils
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.serializer.StringDecoder
import kafka.server.{KafkaConfig, KafkaServer}
@@ -109,7 +108,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
startBrokers(Seq(configProps1, configProps2))
// create topic with 1 partition, 2 replicas, one on each broker
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
verifyUncleanLeaderElectionEnabled
}
@@ -121,7 +120,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
startBrokers(Seq(configProps1, configProps2))
// create topic with 1 partition, 2 replicas, one on each broker
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
verifyUncleanLeaderElectionDisabled
}
@@ -136,7 +135,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
// create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled
val topicProps = new Properties()
topicProps.put("unclean.leader.election.enable", "true")
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
topicProps)
verifyUncleanLeaderElectionEnabled
@@ -153,7 +152,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
// create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled
val topicProps = new Properties()
topicProps.put("unclean.leader.election.enable", "false")
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
topicProps)
verifyUncleanLeaderElectionDisabled
@@ -168,7 +167,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
topicProps.put("unclean.leader.election.enable", "invalid")
intercept[ConfigException] {
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1)), topicProps)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1)), topicProps)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 0dcff53..c250d85 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -28,7 +28,6 @@ import kafka.integration.KafkaServerTestHarness
import kafka.server._
import kafka.serializer._
import kafka.utils._
-import kafka.admin.AdminUtils
import kafka.utils.TestUtils._
import scala.collection._
@@ -73,8 +72,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
@Test
def testMetricsReporterAfterDeletingTopic() {
val topic = "test-topic-metric"
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.createTopic(topic, 1, 1)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
}
@@ -82,13 +81,13 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
@Test
def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
val topic = "test-broker-topic-metric"
- AdminUtils.createTopic(zkUtils, topic, 2, 1)
+ adminZkClient.createTopic(topic, 2, 1)
// Produce a few messages to create the metrics
// Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
TestUtils.produceMessages(servers, topic, nMessages)
assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index fa1174d..a0680a2 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -125,7 +125,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
props.put("request.required.acks", "0")
val producer = new SyncProducer(new SyncProducerConfig(props))
- AdminUtils.createTopic(zkUtils, "test", 1, 1)
+ adminZkClient.createTopic("test", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "test", 0)
// This message will be dropped silently since message size too large.
@@ -167,9 +167,9 @@ class SyncProducerTest extends KafkaServerTestHarness {
}
// #2 - test that we get correct offsets when partition is owned by broker
- AdminUtils.createTopic(zkUtils, "topic1", 1, 1)
+ adminZkClient.createTopic("topic1", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic1", 0)
- AdminUtils.createTopic(zkUtils, "topic3", 1, 1)
+ adminZkClient.createTopic("topic3", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic3", 0)
val response2 = producer.send(request)
@@ -243,7 +243,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val producer = new SyncProducer(new SyncProducerConfig(props))
val topicProps = new Properties()
topicProps.put("min.insync.replicas","2")
- AdminUtils.createTopic(zkUtils, topicName, 1, 1,topicProps)
+ adminZkClient.createTopic(topicName, 1, 1,topicProps)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topicName, 0)
val response = producer.send(produceRequest(topicName, 0,
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index dd7bb5f..85bd6a1 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -26,7 +26,7 @@ import org.easymock.EasyMock
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
import kafka.utils._
-import kafka.admin.{AdminOperationException, AdminUtils}
+import kafka.admin.AdminOperationException
import org.apache.kafka.common.TopicPartition
import scala.collection.Map
@@ -43,14 +43,14 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
val tp = new TopicPartition("test", 0)
val logProps = new Properties()
logProps.put(FlushMessagesProp, oldVal.toString)
- AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)
+ adminZkClient.createTopic(tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
val logOpt = this.servers.head.logManager.getLog(tp)
assertTrue(logOpt.isDefined)
assertEquals(oldVal, logOpt.get.config.flushInterval)
}
logProps.put(FlushMessagesProp, newVal.toString)
- AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps)
+ adminZkClient.changeTopicConfig(tp.topic, logProps)
TestUtils.retry(10000) {
assertEquals(newVal, this.servers.head.logManager.getLog(tp).get.config.flushInterval)
}
@@ -65,8 +65,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
val quotaManagers = servers.head.apis.quotas
rootEntityType match {
- case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, configEntityName, props)
- case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, configEntityName, props)
+ case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, props)
+ case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, props)
}
TestUtils.retry(10000) {
@@ -84,8 +84,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
val emptyProps = new Properties()
rootEntityType match {
- case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, configEntityName, emptyProps)
- case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, configEntityName, emptyProps)
+ case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, emptyProps)
+ case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, emptyProps)
}
TestUtils.retry(10000) {
val producerQuota = quotaManagers.produce.quota(user, clientId)
@@ -142,9 +142,9 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
userClientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "100000")
userClientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "200000")
- AdminUtils.changeClientIdConfig(zkUtils, "overriddenClientId", clientIdProps)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "overriddenUser", userProps)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
+ adminZkClient.changeClientIdConfig("overriddenClientId", clientIdProps)
+ adminZkClient.changeUserOrUserClientIdConfig("overriddenUser", userProps)
+ adminZkClient.changeUserOrUserClientIdConfig("ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
// Remove config change znodes to force quota initialization only through loading of user/client quotas
zkUtils.getChildren(ZkUtils.ConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.ConfigChangesPath + "/" + p) }
@@ -165,7 +165,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
try {
val logProps = new Properties()
logProps.put(FlushMessagesProp, 10000: java.lang.Integer)
- AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
+ adminZkClient.changeTopicConfig(topic, logProps)
fail("Should fail with AdminOperationException for topic doesn't exist")
} catch {
case _: AdminOperationException => // expected
@@ -187,7 +187,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
EasyMock.expectLastCall().once()
EasyMock.replay(handler)
- val configManager = new DynamicConfigManager(zkUtils, zkClient, Map(ConfigType.Topic -> handler))
+ val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
// Notifications created using the old TopicConfigManager are ignored.
configManager.ConfigChangedNotificationHandler.processNotification("not json")
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index 9e6b1b2..b2378cf 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -16,50 +16,39 @@
*/
package kafka.server
-import kafka.admin.AdminUtils
-import kafka.utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.common.config._
-import org.easymock.EasyMock
-import org.junit.{Before, Test}
import kafka.utils.CoreUtils._
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.config._
+import org.junit.Test
-class DynamicConfigTest {
+class DynamicConfigTest extends ZooKeeperTestHarness {
private final val nonExistentConfig: String = "some.config.that.does.not.exist"
private final val someValue: String = "some interesting value"
- var zkUtils: ZkUtils = _
-
- @Before
- def setUp() {
- val zkClient = EasyMock.createMock(classOf[ZkClient])
- zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
- }
-
@Test(expected = classOf[IllegalArgumentException])
def shouldFailWhenChangingBrokerUnknownConfig() {
- AdminUtils.changeBrokerConfig(zkUtils, Seq(0), propsWith(nonExistentConfig, someValue))
+ adminZkClient.changeBrokerConfig(Seq(0), propsWith(nonExistentConfig, someValue))
}
@Test(expected = classOf[IllegalArgumentException])
def shouldFailWhenChangingClientIdUnknownConfig() {
- AdminUtils.changeClientIdConfig(zkUtils, "ClientId", propsWith(nonExistentConfig, someValue))
+ adminZkClient.changeClientIdConfig("ClientId", propsWith(nonExistentConfig, someValue))
}
@Test(expected = classOf[IllegalArgumentException])
def shouldFailWhenChangingUserUnknownConfig() {
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "UserId", propsWith(nonExistentConfig, someValue))
+ adminZkClient.changeUserOrUserClientIdConfig("UserId", propsWith(nonExistentConfig, someValue))
}
@Test(expected = classOf[ConfigException])
def shouldFailLeaderConfigsWithInvalidValues() {
- AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+ adminZkClient.changeBrokerConfig(Seq(0),
propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "-100"))
}
@Test(expected = classOf[ConfigException])
def shouldFailFollowerConfigsWithInvalidValues() {
- AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+ adminZkClient.changeBrokerConfig(Seq(0),
propsWith(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "-100"))
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cabbd5d..a51acd0 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -90,7 +90,6 @@ class KafkaApisTest {
groupCoordinator,
txnCoordinator,
controller,
- zkUtils,
zkClient,
brokerId,
new KafkaConfig(properties),
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 306dbc0..b0a7d72 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -21,7 +21,6 @@ import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, Random}
-import kafka.admin.AdminUtils
import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
@@ -81,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in ZooKeeper as owners of partitions for this test
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
val logManager = server.getLogManager
waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
@@ -116,7 +115,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in ZooKeeper as owners of partitions for this test
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
val logManager = server.getLogManager
waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
@@ -179,7 +178,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in ZooKeeper as owners of partitions for this test
- AdminUtils.createTopic(zkUtils, topic, 3, 1)
+ adminZkClient.createTopic(topic, 3, 1)
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)
@@ -208,7 +207,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in ZooKeeper as owners of partitions for this test
- AdminUtils.createTopic(zkUtils, topic, 3, 1)
+ adminZkClient.createTopic(topic, 3, 1)
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index bb7e9fe..61e17e3 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -30,8 +30,6 @@ import org.junit.Assert._
import java.util.Properties
import java.io.File
-import kafka.admin.AdminUtils
-
import scala.util.Random
import scala.collection._
@@ -319,7 +317,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
assertEquals(Errors.NONE, commitResponse.commitStatus.get(topicPartition).get)
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, Seq(server))
Thread.sleep(retentionCheckInterval * 2)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index f162492..45a6bdd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -19,8 +19,6 @@ package kafka.server
import java.util.Properties
-import kafka.admin.AdminUtils
-import kafka.admin.AdminUtils._
import kafka.log.LogConfig._
import kafka.server.KafkaConfig.fromProps
import kafka.server.QuotaType._
@@ -80,7 +78,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
//Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on node 6,7 (not started yet)
//And two extra partitions 6,7, which we don't intend on throttling.
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
0 -> Seq(100, 106), //Throttled
1 -> Seq(101, 106), //Throttled
2 -> Seq(102, 106), //Throttled
@@ -99,7 +97,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
//Set the throttle limit on all 8 brokers, but only assign throttled replicas to the six leaders, or two followers
(100 to 107).foreach { brokerId =>
- changeBrokerConfig(zkUtils, Seq(brokerId),
+ adminZkClient.changeBrokerConfig(Seq(brokerId),
propsWith(
(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString),
(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
@@ -108,9 +106,9 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
//Either throttle the six leaders or the two followers
if (leaderThrottle)
- changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100,1:101,2:102,3:103,4:104,5:105" ))
+ adminZkClient.changeTopicConfig(topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100,1:101,2:102,3:103,4:104,5:105" ))
else
- changeTopicConfig(zkUtils, topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
+ adminZkClient.changeTopicConfig(topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
//Add data equally to each partition
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
@@ -178,7 +176,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
val config: Properties = createBrokerConfig(100, zkConnect)
config.put("log.segment.bytes", (1024 * 1024).toString)
brokers = Seq(createServer(fromProps(config)))
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
//Write 20MBs and throttle at 5MB/s
val msg = msg100KB
@@ -187,8 +185,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
val throttle: Long = msg.length * msgCount / expectedDuration
//Set the throttle to only limit leader
- changeBrokerConfig(zkUtils, Seq(100), propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString))
- changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100"))
+ adminZkClient.changeBrokerConfig(Seq(100), propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString))
+ adminZkClient.changeTopicConfig(topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100"))
//Add data
addData(msgCount, msg)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 4774e1d..ee75933 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -19,7 +19,6 @@ import java.nio.ByteBuffer
import java.util.{Collections, LinkedHashMap, Properties}
import java.util.concurrent.{Executors, Future, TimeUnit}
-import kafka.admin.AdminUtils
import kafka.log.LogConfig
import kafka.network.RequestChannel.Session
import kafka.security.auth._
@@ -81,9 +80,9 @@ class RequestQuotaTest extends BaseRequestTest {
// Change default client-id request quota to a small value and a single unthrottledClient with a large quota
val quotaProps = new Properties()
quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
- AdminUtils.changeClientIdConfig(zkUtils, "<default>", quotaProps)
+ adminZkClient.changeClientIdConfig("<default>", quotaProps)
quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000")
- AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(unthrottledClientId), quotaProps)
+ adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
TestUtils.retry(10000) {
val quotaManager = servers.head.apis.quotas.request