You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/11/22 21:25:58 UTC

[2/3] kafka git commit: KAFKA-5646; Use KafkaZkClient in DynamicConfigManager and AdminManager

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 47d487e..3f51528 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -32,7 +32,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAw
       "--replication-factor", replicationFactor.toString,
       "--disable-rack-aware",
       "--topic", "foo"))
-    kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
+    kafka.admin.TopicCommand.createTopic(zkClient, createOpts)
 
     val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
     val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 522fcd3..b701f13 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,7 +19,6 @@ import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
 import kafka.admin.AdminClient
-import kafka.admin.AdminUtils
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
 import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
 import kafka.common.TopicAndPartition
@@ -447,9 +446,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
    */
   @Test
   def testAuthorizationWithTopicNotExisting() {
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    AdminUtils.deleteTopic(zkUtils, deleteTopic)
+    adminZkClient.deleteTopic(deleteTopic)
     TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
 
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index eadb488..0badda9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -22,7 +22,6 @@ import java.util.Properties
 import java.util.concurrent.TimeUnit
 
 import collection.JavaConverters._
-import kafka.admin.AdminUtils
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
@@ -375,7 +374,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
       case (topicPartition, replicas) => topicPartition.partition -> replicas
     }
-    AdminUtils.addPartitions(zkUtils, topic, existingAssignment, AdminUtils.getBrokerMetadatas(zkUtils), 2)
+    adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), 2)
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index 383f139..3e08327 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -16,7 +16,6 @@ package kafka.api
 
 import java.util.Properties
 
-import kafka.admin.AdminUtils
 import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Sanitizer
@@ -54,6 +53,6 @@ class ClientIdQuotaTest extends BaseQuotaTest {
   }
 
   private def updateQuotaOverride(clientId: String, properties: Properties) {
-    AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(clientId), properties)
+    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(clientId), properties)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index cb5262d..0ae9d17 100644
--- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -18,7 +18,7 @@ package kafka.api
 
 import java.util.Properties
 
-import kafka.admin.{RackAwareMode, AdminUtils, RackAwareTest}
+import kafka.admin.{RackAwareMode, RackAwareTest}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
@@ -55,7 +55,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa
       val assignment = zkUtils.getReplicaAssignmentForTopics(Seq(topic)).map { case (topicPartition, replicas) =>
         topicPartition.partition -> replicas
       }
-      val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
+      val brokerMetadatas = adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced)
       val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
       assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap)
       checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index e25f886..453ac91 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -17,7 +17,6 @@ package kafka.api
 import java.io.File
 import java.util.Properties
 
-import kafka.admin.AdminUtils
 import kafka.server._
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Sanitizer
@@ -41,7 +40,7 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
     val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
+    adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
     waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
@@ -59,11 +58,11 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
 
   override def removeQuotaOverrides() {
     val emptyProps = new Properties
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
+    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
+    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
   }
 
   private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) {
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
+    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index b5d88c0..91a92fa 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -17,7 +17,6 @@ package kafka.api
 import java.io.File
 import java.util.Properties
 
-import kafka.admin.AdminUtils
 import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -45,7 +44,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
     val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default, defaultProps)
+    adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default, defaultProps)
     waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
@@ -67,6 +66,6 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
   }
 
   private def updateQuotaOverride(properties: Properties) {
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal), properties)
+    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal), properties)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 075b1af..10c7737 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.server.ReplicaFetcherThread.{FetchRequest, PartitionData}
 import kafka.utils.{Exit, TestUtils, ZkUtils}
@@ -60,7 +59,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
     // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
     // the metadata is propagated.
     def createTopic(zkUtils: ZkUtils, topic: String): Unit = {
-      AdminUtils.createTopic(zkUtils, topic, partitions = 1, replicationFactor = 2)
+      adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 79fc68f..d79ba32 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -73,7 +73,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testWrongReplicaCount(): Unit = {
     try {
-      AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+      adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 2,
         Some(Map(0 -> Seq(0, 1), 1 -> Seq(0, 1, 2))))
       fail("Add partitions should fail")
     } catch {
@@ -84,7 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testMissingPartition0(): Unit = {
     try {
-      AdminUtils.addPartitions(zkUtils, topic5, topic5Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+      adminZkClient.addPartitions(topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
         Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2))))
       fail("Add partitions should fail")
     } catch {
@@ -95,7 +95,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testIncrementPartitions(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
+    adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 3)
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
     val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
@@ -123,7 +123,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testManualAssignmentOfReplicas(): Unit = {
     // Add 2 partitions
-    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3,
+    adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3,
       Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
@@ -152,7 +152,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testReplicaPlacementAllServers(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic3, topic3Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 7)
+    adminZkClient.addPartitions(topic3, topic3Assignment, adminZkClient.getBrokerMetadatas(), 7)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
@@ -179,7 +179,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testReplicaPlacementPartialServers(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
+    adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 87ce46e..acac907 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -21,8 +21,8 @@ import java.util.Properties
 import kafka.admin.ConfigCommand.ConfigCommandOptions
 import kafka.common.InvalidConfigException
 import kafka.server.ConfigEntityName
-import kafka.utils.{Logging, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.Logging
+import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.common.security.scram.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
 import org.easymock.EasyMock
@@ -95,7 +95,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   def shouldFailIfUnrecognisedEntityType(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test
@@ -106,14 +106,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
-    val configChange = new TestAdminUtils {
-      override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configChange: Properties): Unit = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def changeClientIdConfig(clientId: String, configChange: Properties): Unit = {
         assertEquals("my-client-id", clientId)
         assertEquals("b", configChange.get("a"))
         assertEquals("d", configChange.get("c"))
       }
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test
@@ -124,14 +125,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
-    val configChange = new TestAdminUtils {
-      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties): Unit = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
         assertEquals("my-topic", topic)
         assertEquals("b", configChange.get("a"))
         assertEquals("d", configChange.get("c"))
       }
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test
@@ -142,14 +144,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
-    val configChange = new TestAdminUtils {
-      override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
         assertEquals(Seq(1), brokerIds)
         assertEquals("b", configChange.get("a"))
         assertEquals("d", configChange.get("c"))
       }
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test
@@ -160,15 +163,18 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=[d,e ,f],g=[h,i]"))
 
-    val configChange = new TestAdminUtils {
-      override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
         assertEquals(Seq(1), brokerIds)
         assertEquals("b", configChange.get("a"))
         assertEquals("d,e ,f", configChange.get("c"))
         assertEquals("h,i", configChange.get("g"))
       }
+
+      override def changeTopicConfig(topic: String, configs: Properties): Unit = {}
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test (expected = classOf[IllegalArgumentException])
@@ -178,7 +184,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-type", "brokers",
       "--alter",
       "--add-config", "a=b"))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test (expected = classOf[IllegalArgumentException])
@@ -188,7 +194,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-type", "brokers",
       "--alter",
       "--add-config", "a="))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test (expected = classOf[IllegalArgumentException])
@@ -198,7 +204,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-type", "brokers",
       "--alter",
       "--add-config", "a=[b,c,d=e"))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test (expected = classOf[InvalidConfigException])
@@ -208,7 +214,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-type", "topics",
       "--alter",
       "--delete-config", "missing_config1, missing_config2"))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test
@@ -219,8 +225,8 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--delete-config", "a,c"))
 
-    val configChange = new TestAdminUtils {
-      override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
         val properties: Properties = new Properties
         properties.put("a", "b")
         properties.put("c", "d")
@@ -228,12 +234,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         properties
       }
 
-      override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+      override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
         assertEquals("f", configChange.get("e"))
         assertEquals(1, configChange.size())
       }
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test
@@ -253,11 +260,11 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         "--delete-config", mechanism))
 
     val credentials = mutable.Map[String, Properties]()
-    case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends TestAdminUtils {
-      override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+    case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends AdminZkClient(zkClient) {
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
         credentials.getOrElse(entityName, new Properties())
       }
-      override def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configChange: Properties): Unit = {
+      override def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configChange: Properties): Unit = {
         assertEquals(user, sanitizedEntityName)
         assertEquals(mechanisms, configChange.keySet().asScala)
         for (mechanism <- mechanisms) {
@@ -397,17 +404,17 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def testQuotaDescribeEntities() {
-    val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+    val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
 
     def checkEntities(opts: Array[String], expectedFetches: Map[String, Seq[String]], expectedEntityNames: Seq[String]) {
       val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ "--describe"))
       expectedFetches.foreach {
-        case (name, values) => EasyMock.expect(zkUtils.getAllEntitiesWithConfig(name)).andReturn(values)
+        case (name, values) => EasyMock.expect(zkClient.getAllEntitiesWithConfig(name)).andReturn(values)
       }
-      EasyMock.replay(zkUtils)
-      val entities = entity.getAllEntities(zkUtils)
+      EasyMock.replay(zkClient)
+      val entities = entity.getAllEntities(zkClient)
       assertEquals(expectedEntityNames, entities.map(e => e.fullSanitizedName))
-      EasyMock.reset(zkUtils)
+      EasyMock.reset(zkClient)
     }
 
     val clientId = "a-client"
@@ -456,4 +463,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         Map("users" -> Seq("<default>", sanitizedPrincipal)) ++ defaultUserMap ++ userMap,
         Seq("<default>/clients/client-3", sanitizedPrincipal + "/clients/client-2"))
   }
+
+  case class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+    override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): Unit = {}
+    override def fetchEntityConfig(entityType: String, entityName: String): Properties = {new Properties}
+    override def changeClientIdConfig(clientId: String, configs: Properties): Unit = {}
+    override def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties): Unit = {}
+    override def changeTopicConfig(topic: String, configs: Properties): Unit = {}
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 78f022a..1922aaf 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -46,7 +46,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = "test"
     servers = createTestTopicAndCluster(topic)
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
   }
 
@@ -61,7 +61,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
     follower.shutdown()
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     // check if all replicas but the one that is shut down has deleted the log
     TestUtils.waitUntilTrue(() =>
       servers.filter(s => s.config.brokerId != follower.config.brokerId)
@@ -86,7 +86,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     follower.shutdown()
 
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     // shut down the controller to trigger controller failover during delete topic
     controller.shutdown()
 
@@ -112,7 +112,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     this.servers = allServers
     val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created.")
@@ -121,7 +121,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
     follower.shutdown()
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
     // the topic is being deleted
     // reassign partition 0
@@ -155,16 +155,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val follower = servers.filter(_.config.brokerId != leaderIdOpt.get).last
     val newPartition = new TopicPartition(topic, 1)
     // capture the brokers before we shutdown so that we don't fail validation in `addPartitions`
-    val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
+    val brokers = adminZkClient.getBrokerMetadatas()
     follower.shutdown()
     // wait until the broker has been removed from ZK to reduce non-determinism
     TestUtils.waitUntilTrue(() => zkUtils.getBrokerInfo(follower.config.brokerId).isEmpty,
       s"Follower ${follower.config.brokerId} was not removed from ZK")
     // add partitions to topic
-    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
+    adminZkClient.addPartitions(topic, expectedReplicaAssignment, brokers, 2,
       Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     follower.startup()
     // test if topic deletion is resumed
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -178,12 +178,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   def testAddPartitionDuringDeleteTopic() {
     val topic = "test"
     servers = createTestTopicAndCluster(topic)
-    val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
+    val brokers = adminZkClient.getBrokerMetadatas()
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     // add partitions to topic
     val newPartition = new TopicPartition(topic, 1)
-    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
+    adminZkClient.addPartitions(topic, expectedReplicaAssignment, brokers, 2,
       Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
@@ -198,10 +198,10 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topicPartition = new TopicPartition(topic, 0)
     servers = createTestTopicAndCluster(topic)
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // re-create topic on same replicas
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // wait until leader is elected
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
     // check if all replica logs are created
@@ -216,7 +216,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     servers = createTestTopicAndCluster(topic)
     // start topic deletion
     try {
-      AdminUtils.deleteTopic(zkUtils, "test2")
+      adminZkClient.deleteTopic("test2")
       fail("Expected UnknownTopicOrPartitionException")
     } catch {
       case _: UnknownTopicOrPartitionException => // expected exception
@@ -258,7 +258,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
    server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
 
     // delete topic
-    AdminUtils.deleteTopic(zkUtils, "test")
+    adminZkClient.deleteTopic("test")
     TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers)
   }
 
@@ -270,9 +270,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
     try {
       // start topic deletion
-      AdminUtils.deleteTopic(zkUtils, topic)
+      adminZkClient.deleteTopic(topic)
       // try to delete topic marked as deleted
-      AdminUtils.deleteTopic(zkUtils, topic)
+      adminZkClient.deleteTopic(topic)
       fail("Expected TopicAlreadyMarkedForDeletionException")
     }
     catch {
@@ -293,7 +293,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created")
@@ -316,7 +316,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = topicPartition.topic
     servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
     // mark the topic for deletion
-    AdminUtils.deleteTopic(zkUtils, "test")
+    adminZkClient.deleteTopic("test")
     TestUtils.waitUntilTrue(() => !zkUtils.isTopicMarkedForDeletion(topic),
       "Admin path /admin/delete_topic/%s path not deleted even if deleteTopic is disabled".format(topic))
     // verify that topic test is untouched

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 7000308..e367372 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -56,7 +56,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
   }
 
   @After
@@ -234,7 +234,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   def testDescribeWithMultiPartitionTopicAndMultipleConsumersWithNewConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     val topic2 = "foo2"
-    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+    adminZkClient.createTopic(topic2, 2, 1)
 
     // run two consumers in the group consuming from a two-partition topic
     consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 6727fad..4b22898 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -30,7 +30,6 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 
-
 class ListConsumerGroupTest extends KafkaServerTestHarness {
 
   val overridingProps = new Properties()
@@ -47,7 +46,7 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
   override def setUp() {
     super.setUp()
 
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
     props.setProperty("group.id", group)
     props.setProperty("zookeeper.connect", zkConnect)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 6853b16..8227487 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -88,7 +88,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
 
@@ -98,13 +98,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 1 (specific offset).")
 
     printConsumerGroup("new.group")
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
     consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsToLocalDateTime() {
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
     val calendar = Calendar.getInstance()
@@ -144,12 +144,12 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
 
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
   def testResetOffsetsToZonedDateTime() {
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
     TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
 
     val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
@@ -188,7 +188,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
 
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -230,7 +230,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -241,7 +241,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
 
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -250,7 +250,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -260,7 +260,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 100 (latest by duration).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -269,7 +269,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -279,7 +279,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -288,7 +288,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -301,7 +301,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 200 (latest).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -310,7 +310,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -322,7 +322,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 100 (current).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) {
@@ -351,7 +351,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -362,7 +362,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 1 (specific offset).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -371,7 +371,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -383,7 +383,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 150 (current + 50).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -392,7 +392,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -405,7 +405,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 50 (current - 50).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -414,7 +414,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -426,7 +426,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest by shift).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -435,7 +435,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -447,7 +447,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 200 (latest by shift).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -456,7 +456,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -466,7 +466,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -475,7 +475,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+    adminZkClient.createTopic(topic1, 2, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
 
@@ -485,7 +485,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -498,8 +498,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
-    AdminUtils.createTopic(zkUtils, topic2, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
+    adminZkClient.createTopic(topic2, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic2, 100)
@@ -511,8 +511,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
-    AdminUtils.deleteTopic(zkUtils, topic2)
+    adminZkClient.deleteTopic(topic1)
+    adminZkClient.deleteTopic(topic2)
   }
 
   @Test
@@ -525,8 +525,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 2, 1)
-    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+    adminZkClient.createTopic(topic1, 2, 1)
+    adminZkClient.createTopic(topic2, 2, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
     produceConsumeAndShutdown(consumerGroupCommand, 2, topic2, 100)
@@ -538,8 +538,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
-    AdminUtils.deleteTopic(zkUtils, topic2)
+    adminZkClient.deleteTopic(topic1)
+    adminZkClient.deleteTopic(topic2)
   }
 
   @Test
@@ -548,7 +548,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+    adminZkClient.createTopic(topic1, 2, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
 
@@ -575,7 +575,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     file.deleteOnExit()
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   private def printConsumerGroup() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 53efa34..180f257 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -44,8 +44,8 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
       "--replication-factor", "1",
       "--config", cleanupKey + "=" + cleanupVal,
       "--topic", topic))
-    TopicCommand.createTopic(zkUtils, createOpts)
-    val props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+    TopicCommand.createTopic(zkClient, createOpts)
+    val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
     assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
     assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
 
@@ -55,8 +55,8 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // modify the topic to add new partitions
     val numPartitionsModified = 3
     val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic))
-    TopicCommand.alterTopic(zkUtils, alterOpts)
-    val newProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+    TopicCommand.alterTopic(zkClient, alterOpts)
+    val newProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
     assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
     assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
   }
@@ -75,27 +75,27 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
       "--topic", normalTopic))
-    TopicCommand.createTopic(zkUtils, createOpts)
+    TopicCommand.createTopic(zkClient, createOpts)
 
     // delete the NormalTopic
     val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
     val deletePath = getDeleteTopicPath(normalTopic)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deletePath))
-    TopicCommand.deleteTopic(zkUtils, deleteOpts)
+    TopicCommand.deleteTopic(zkClient, deleteOpts)
     assertTrue("Delete path for topic should exist after deletion.", zkUtils.pathExists(deletePath))
 
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
       "--topic", Topic.GROUP_METADATA_TOPIC_NAME))
-    TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
+    TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
 
     // try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
     val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
     val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
-      TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
+      TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
     }
     assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
   }
@@ -109,12 +109,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // delete a topic that does not exist without --if-exists
     val deleteOpts = new TopicCommandOptions(Array("--topic", "test"))
     intercept[IllegalArgumentException] {
-      TopicCommand.deleteTopic(zkUtils, deleteOpts)
+      TopicCommand.deleteTopic(zkClient, deleteOpts)
     }
 
     // delete a topic that does not exist with --if-exists
     val deleteExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--if-exists"))
-    TopicCommand.deleteTopic(zkUtils, deleteExistsOpts)
+    TopicCommand.deleteTopic(zkClient, deleteExistsOpts)
   }
 
   @Test
@@ -126,12 +126,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // alter a topic that does not exist without --if-exists
     val alterOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions", "1"))
     intercept[IllegalArgumentException] {
-      TopicCommand.alterTopic(zkUtils, alterOpts)
+      TopicCommand.alterTopic(zkClient, alterOpts)
     }
 
     // alter a topic that does not exist with --if-exists
     val alterExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions", "1", "--if-exists"))
-    TopicCommand.alterTopic(zkUtils, alterExistsOpts)
+    TopicCommand.alterTopic(zkClient, alterExistsOpts)
   }
 
   @Test
@@ -146,17 +146,17 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // create the topic
     val createOpts = new TopicCommandOptions(
       Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic))
-    TopicCommand.createTopic(zkUtils, createOpts)
+    TopicCommand.createTopic(zkClient, createOpts)
 
     // try to re-create the topic without --if-not-exists
     intercept[TopicExistsException] {
-      TopicCommand.createTopic(zkUtils, createOpts)
+      TopicCommand.createTopic(zkClient, createOpts)
     }
 
     // try to re-create the topic with --if-not-exists
     val createNotExistsOpts = new TopicCommandOptions(
       Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic, "--if-not-exists"))
-    TopicCommand.createTopic(zkUtils, createNotExistsOpts)
+    TopicCommand.createTopic(zkClient, createNotExistsOpts)
   }
 
   @Test
@@ -170,7 +170,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
       "--partitions", numPartitions.toString,
       "--replication-factor", replicationFactor.toString,
       "--topic", "foo"))
-    TopicCommand.createTopic(zkUtils, createOpts)
+    TopicCommand.createTopic(zkClient, createOpts)
 
     var assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
       tp.partition -> replicas
@@ -182,7 +182,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     val alterOpts = new TopicCommandOptions(Array(
       "--partitions", alteredNumPartitions.toString,
       "--topic", "foo"))
-    TopicCommand.alterTopic(zkUtils, alterOpts)
+    TopicCommand.alterTopic(zkClient, alterOpts)
     assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
       tp.partition -> replicas
     }
@@ -198,28 +198,28 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     TestUtils.createBrokersInZk(zkUtils, brokers)
 
     val createOpts = new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", topic))
-    TopicCommand.createTopic(zkUtils, createOpts)
+    TopicCommand.createTopic(zkClient, createOpts)
 
     // delete the broker first, so when we attempt to delete the topic it gets into "marked for deletion"
     TestUtils.deleteBrokersInZk(zkUtils, brokers)
-    TopicCommand.deleteTopic(zkUtils, new TopicCommandOptions(Array("--topic", topic)))
+    TopicCommand.deleteTopic(zkClient, new TopicCommandOptions(Array("--topic", topic)))
 
     // Test describe topics
     def describeTopicsWithConfig() {
-      TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe")))
+      TopicCommand.describeTopic(zkClient, new TopicCommandOptions(Array("--describe")))
     }
     val outputWithConfig = TestUtils.grabConsoleOutput(describeTopicsWithConfig)
     assertTrue(outputWithConfig.contains(topic) && outputWithConfig.contains(markedForDeletionDescribe))
 
     def describeTopicsNoConfig() {
-      TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe", "--unavailable-partitions")))
+      TopicCommand.describeTopic(zkClient, new TopicCommandOptions(Array("--describe", "--unavailable-partitions")))
     }
     val outputNoConfig = TestUtils.grabConsoleOutput(describeTopicsNoConfig)
     assertTrue(outputNoConfig.contains(topic) && outputNoConfig.contains(markedForDeletionDescribe))
 
     // Test list topics
     def listTopics() {
-      TopicCommand.listTopics(zkUtils, new TopicCommandOptions(Array("--list")))
+      TopicCommand.listTopics(zkClient, new TopicCommandOptions(Array("--list")))
     }
     val output = TestUtils.grabConsoleOutput(listTopics)
     assertTrue(output.contains(topic) && output.contains(markedForDeletionList))

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 04f9bea..32e23cc 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -20,7 +20,6 @@ package kafka.controller
 import java.util.Properties
 import java.util.concurrent.CountDownLatch
 
-import kafka.admin.AdminUtils
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils._
@@ -60,7 +59,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
     }
     val initialEpoch = initialController.epoch
     // Create topic with one partition
-    AdminUtils.createTopic(servers.head.zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
     val topicPartition = new TopicPartition("topic1", 0)
     TestUtils.waitUntilTrue(() =>
       initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index bec5026..daa4276 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -26,7 +26,6 @@ import org.junit.{Before, Test}
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Gauge
 
-
 class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with Logging {
 
   private val nodesNum = 3
@@ -136,7 +135,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
       // Create topics
       for (t <- topics if running) {
         try {
-          kafka.admin.AdminUtils.createTopic(zkUtils, t, partitionNum, replicationFactor)
+          adminZkClient.createTopic(t, partitionNum, replicationFactor)
         } catch {
           case e: Exception => e.printStackTrace
         }
@@ -146,7 +145,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
       // Delete topics
       for (t <- topics if running) {
           try {
-              kafka.admin.AdminUtils.deleteTopic(zkUtils, t)
+              adminZkClient.deleteTopic(t)
           } catch {
           case e: Exception => e.printStackTrace
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 6e9e8ff..e93dcf6 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.integration
 
-import kafka.admin.AdminUtils
 import kafka.api.TopicMetadataResponse
 import kafka.client.ClientUtils
 import kafka.cluster.BrokerEndPoint
@@ -219,7 +218,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
 
     // create topic
     val topic: String = "test"
-    AdminUtils.createTopic(zkUtils, topic, 1, numBrokers)
+    adminZkClient.createTopic(topic, 1, numBrokers)
 
     // shutdown a broker
     adHocServers.last.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 24421d0..7732e38 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -25,7 +25,6 @@ import org.apache.log4j.{Level, Logger}
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 
-import kafka.admin.AdminUtils
 import kafka.consumer.{Consumer, ConsumerConfig}
 import kafka.serializer.StringDecoder
 import kafka.server.{KafkaConfig, KafkaServer}
@@ -109,7 +108,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
 
     verifyUncleanLeaderElectionEnabled
   }
@@ -121,7 +120,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
 
     verifyUncleanLeaderElectionDisabled
   }
@@ -136,7 +135,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled
     val topicProps = new Properties()
     topicProps.put("unclean.leader.election.enable", "true")
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
       topicProps)
 
     verifyUncleanLeaderElectionEnabled
@@ -153,7 +152,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled
     val topicProps = new Properties()
     topicProps.put("unclean.leader.election.enable", "false")
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
       topicProps)
 
     verifyUncleanLeaderElectionDisabled
@@ -168,7 +167,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     topicProps.put("unclean.leader.election.enable", "invalid")
 
     intercept[ConfigException] {
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1)), topicProps)
+      adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1)), topicProps)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 0dcff53..c250d85 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -28,7 +28,6 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import kafka.serializer._
 import kafka.utils._
-import kafka.admin.AdminUtils
 import kafka.utils.TestUtils._
 
 import scala.collection._
@@ -73,8 +72,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   def testMetricsReporterAfterDeletingTopic() {
     val topic = "test-topic-metric"
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.createTopic(topic, 1, 1)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
   }
@@ -82,13 +81,13 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
     val topic = "test-broker-topic-metric"
-    AdminUtils.createTopic(zkUtils, topic, 2, 1)
+    adminZkClient.createTopic(topic, 2, 1)
     // Produce a few messages to create the metrics
     // Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
     TestUtils.produceMessages(servers, topic, nMessages)
     assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
     servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index fa1174d..a0680a2 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -125,7 +125,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     props.put("request.required.acks", "0")
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    AdminUtils.createTopic(zkUtils, "test", 1, 1)
+    adminZkClient.createTopic("test", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "test", 0)
 
     // This message will be dropped silently since message size too large.
@@ -167,9 +167,9 @@ class SyncProducerTest extends KafkaServerTestHarness {
     }
 
     // #2 - test that we get correct offsets when partition is owned by broker
-    AdminUtils.createTopic(zkUtils, "topic1", 1, 1)
+    adminZkClient.createTopic("topic1", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic1", 0)
-    AdminUtils.createTopic(zkUtils, "topic3", 1, 1)
+    adminZkClient.createTopic("topic3", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic3", 0)
 
     val response2 = producer.send(request)
@@ -243,7 +243,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val topicProps = new Properties()
     topicProps.put("min.insync.replicas","2")
-    AdminUtils.createTopic(zkUtils, topicName, 1, 1,topicProps)
+    adminZkClient.createTopic(topicName, 1, 1,topicProps)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topicName, 0)
 
     val response = producer.send(produceRequest(topicName, 0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index dd7bb5f..85bd6a1 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -26,7 +26,7 @@ import org.easymock.EasyMock
 import org.junit.Test
 import kafka.integration.KafkaServerTestHarness
 import kafka.utils._
-import kafka.admin.{AdminOperationException, AdminUtils}
+import kafka.admin.AdminOperationException
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.Map
@@ -43,14 +43,14 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     val tp = new TopicPartition("test", 0)
     val logProps = new Properties()
     logProps.put(FlushMessagesProp, oldVal.toString)
-    AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)
+    adminZkClient.createTopic(tp.topic, 1, 1, logProps)
     TestUtils.retry(10000) {
       val logOpt = this.servers.head.logManager.getLog(tp)
       assertTrue(logOpt.isDefined)
       assertEquals(oldVal, logOpt.get.config.flushInterval)
     }
     logProps.put(FlushMessagesProp, newVal.toString)
-    AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps)
+    adminZkClient.changeTopicConfig(tp.topic, logProps)
     TestUtils.retry(10000) {
       assertEquals(newVal, this.servers.head.logManager.getLog(tp).get.config.flushInterval)
     }
@@ -65,8 +65,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
 
     val quotaManagers = servers.head.apis.quotas
     rootEntityType match {
-      case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, configEntityName, props)
-      case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, configEntityName, props)
+      case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, props)
+      case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, props)
     }
 
     TestUtils.retry(10000) {
@@ -84,8 +84,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
 
     val emptyProps = new Properties()
     rootEntityType match {
-      case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, configEntityName, emptyProps)
-      case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, configEntityName, emptyProps)
+      case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, emptyProps)
+      case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, emptyProps)
     }
     TestUtils.retry(10000) {
       val producerQuota = quotaManagers.produce.quota(user, clientId)
@@ -142,9 +142,9 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     userClientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "100000")
     userClientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "200000")
 
-    AdminUtils.changeClientIdConfig(zkUtils, "overriddenClientId", clientIdProps)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "overriddenUser", userProps)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
+    adminZkClient.changeClientIdConfig("overriddenClientId", clientIdProps)
+    adminZkClient.changeUserOrUserClientIdConfig("overriddenUser", userProps)
+    adminZkClient.changeUserOrUserClientIdConfig("ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
 
     // Remove config change znodes to force quota initialization only through loading of user/client quotas
     zkUtils.getChildren(ZkUtils.ConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.ConfigChangesPath + "/" + p) }
@@ -165,7 +165,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     try {
       val logProps = new Properties()
       logProps.put(FlushMessagesProp, 10000: java.lang.Integer)
-      AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
+      adminZkClient.changeTopicConfig(topic, logProps)
       fail("Should fail with AdminOperationException for topic doesn't exist")
     } catch {
       case _: AdminOperationException => // expected
@@ -187,7 +187,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     EasyMock.expectLastCall().once()
     EasyMock.replay(handler)
 
-    val configManager = new DynamicConfigManager(zkUtils, zkClient, Map(ConfigType.Topic -> handler))
+    val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
     // Notifications created using the old TopicConfigManager are ignored.
     configManager.ConfigChangedNotificationHandler.processNotification("not json")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index 9e6b1b2..b2378cf 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -16,50 +16,39 @@
   */
 package kafka.server
 
-import kafka.admin.AdminUtils
-import kafka.utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.common.config._
-import org.easymock.EasyMock
-import org.junit.{Before, Test}
 import kafka.utils.CoreUtils._
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.config._
+import org.junit.Test
 
-class DynamicConfigTest {
+class DynamicConfigTest  extends ZooKeeperTestHarness {
   private final val nonExistentConfig: String = "some.config.that.does.not.exist"
   private final val someValue: String = "some interesting value"
 
-  var zkUtils: ZkUtils = _
-
-  @Before
-  def setUp() {
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
-  }
-
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingBrokerUnknownConfig() {
-    AdminUtils.changeBrokerConfig(zkUtils, Seq(0), propsWith(nonExistentConfig, someValue))
+    adminZkClient.changeBrokerConfig(Seq(0), propsWith(nonExistentConfig, someValue))
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingClientIdUnknownConfig() {
-    AdminUtils.changeClientIdConfig(zkUtils, "ClientId", propsWith(nonExistentConfig, someValue))
+    adminZkClient.changeClientIdConfig("ClientId", propsWith(nonExistentConfig, someValue))
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingUserUnknownConfig() {
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "UserId", propsWith(nonExistentConfig, someValue))
+    adminZkClient.changeUserOrUserClientIdConfig("UserId", propsWith(nonExistentConfig, someValue))
   }
 
   @Test(expected = classOf[ConfigException])
   def shouldFailLeaderConfigsWithInvalidValues() {
-    AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+    adminZkClient.changeBrokerConfig(Seq(0),
       propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "-100"))
   }
 
   @Test(expected = classOf[ConfigException])
   def shouldFailFollowerConfigsWithInvalidValues() {
-    AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+    adminZkClient.changeBrokerConfig(Seq(0),
       propsWith(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "-100"))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cabbd5d..a51acd0 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -90,7 +90,6 @@ class KafkaApisTest {
       groupCoordinator,
       txnCoordinator,
       controller,
-      zkUtils,
       zkClient,
       brokerId,
       new KafkaConfig(properties),

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 306dbc0..b0a7d72 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -21,7 +21,6 @@ import java.io.File
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Properties, Random}
 
-import kafka.admin.AdminUtils
 import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.common.TopicAndPartition
 import kafka.consumer.SimpleConsumer
@@ -81,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in ZooKeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
 
     val logManager = server.getLogManager
     waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
@@ -116,7 +115,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in ZooKeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
 
     val logManager = server.getLogManager
     waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
@@ -179,7 +178,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in ZooKeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkUtils, topic, 3, 1)
+    adminZkClient.createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)
@@ -208,7 +207,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in ZooKeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkUtils, topic, 3, 1)
+    adminZkClient.createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index bb7e9fe..61e17e3 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -30,8 +30,6 @@ import org.junit.Assert._
 import java.util.Properties
 import java.io.File
 
-import kafka.admin.AdminUtils
-
 import scala.util.Random
 import scala.collection._
 
@@ -319,7 +317,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     assertEquals(Errors.NONE, commitResponse.commitStatus.get(topicPartition).get)
 
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, Seq(server))
     Thread.sleep(retentionCheckInterval * 2)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index f162492..45a6bdd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -19,8 +19,6 @@ package kafka.server
 
 import java.util.Properties
 
-import kafka.admin.AdminUtils
-import kafka.admin.AdminUtils._
 import kafka.log.LogConfig._
 import kafka.server.KafkaConfig.fromProps
 import kafka.server.QuotaType._
@@ -80,7 +78,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on node 6,7 (not started yet)
     //And two extra partitions 6,7, which we don't intend on throttling.
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
       0 -> Seq(100, 106), //Throttled
       1 -> Seq(101, 106), //Throttled
       2 -> Seq(102, 106), //Throttled
@@ -99,7 +97,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Set the throttle limit on all 8 brokers, but only assign throttled replicas to the six leaders, or two followers
     (100 to 107).foreach { brokerId =>
-      changeBrokerConfig(zkUtils, Seq(brokerId),
+      adminZkClient.changeBrokerConfig(Seq(brokerId),
         propsWith(
           (DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString),
           (DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
@@ -108,9 +106,9 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Either throttle the six leaders or the two followers
     if (leaderThrottle)
-      changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100,1:101,2:102,3:103,4:104,5:105" ))
+      adminZkClient.changeTopicConfig(topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100,1:101,2:102,3:103,4:104,5:105" ))
     else
-      changeTopicConfig(zkUtils, topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
+      adminZkClient.changeTopicConfig(topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
 
     //Add data equally to each partition
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
@@ -178,7 +176,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val config: Properties = createBrokerConfig(100, zkConnect)
     config.put("log.segment.bytes", (1024 * 1024).toString)
     brokers = Seq(createServer(fromProps(config)))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
 
     //Write 20MBs and throttle at 5MB/s
     val msg = msg100KB
@@ -187,8 +185,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val throttle: Long = msg.length * msgCount / expectedDuration
 
     //Set the throttle to only limit leader
-    changeBrokerConfig(zkUtils, Seq(100), propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString))
-    changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100"))
+    adminZkClient.changeBrokerConfig(Seq(100), propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString))
+    adminZkClient.changeTopicConfig(topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100"))
 
     //Add data
     addData(msgCount, msg)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 4774e1d..ee75933 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -19,7 +19,6 @@ import java.nio.ByteBuffer
 import java.util.{Collections, LinkedHashMap, Properties}
 import java.util.concurrent.{Executors, Future, TimeUnit}
 
-import kafka.admin.AdminUtils
 import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
@@ -81,9 +80,9 @@ class RequestQuotaTest extends BaseRequestTest {
     // Change default client-id request quota to a small value and a single unthrottledClient with a large quota
     val quotaProps = new Properties()
     quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
-    AdminUtils.changeClientIdConfig(zkUtils, "<default>", quotaProps)
+    adminZkClient.changeClientIdConfig("<default>", quotaProps)
     quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000")
-    AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(unthrottledClientId), quotaProps)
+    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
 
     TestUtils.retry(10000) {
       val quotaManager = servers.head.apis.quotas.request