You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/11/22 21:25:57 UTC

[1/3] kafka git commit: KAFKA-5646; Use KafkaZkClient in DynamicConfigManager and AdminManager

Repository: kafka
Updated Branches:
  refs/heads/trunk c5f31fe38 -> bc852baff


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 2d672b6..a44f900 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -20,7 +20,6 @@ package kafka.server.epoch
 import java.io.{File, RandomAccessFile}
 import java.util.Properties
 
-import kafka.admin.AdminUtils
 import kafka.api.KAFKA_0_11_0_IV1
 import kafka.log.Log
 import kafka.server.KafkaConfig._
@@ -78,7 +77,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     brokers = (100 to 101).map(createBroker(_))
 
     //A single partition topic with 2 replicas
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
     producer = createProducer()
     val tp = new TopicPartition(topic, 0)
 
@@ -139,7 +138,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     brokers = (100 to 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
 
     //A single partition topic with 2 replicas
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
       0 -> Seq(100, 101)
     ))
     producer = createProducer()
@@ -189,7 +188,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     brokers = (100 to 101).map(createBroker(_))
 
     //A single partition topic with 2 replicas
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
       0 -> Seq(100, 101)
     ))
     producer = bufferingProducer()
@@ -266,7 +265,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
     brokers = (100 to 101).map(createBroker(_))
 
     //A single partition topic with 2 replicas
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
     producer = createProducer()
 
     //Kick off with a single record

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 89c12fe..eaafb2a 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -18,7 +18,6 @@ package kafka.server.epoch
 
 import java.util.{Map => JMap}
 
-import kafka.admin.AdminUtils
 import kafka.server.KafkaConfig._
 import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend}
 import kafka.utils.TestUtils._
@@ -96,11 +95,11 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
     //3 brokers, put partition on 100/101 and then pretend to be 102
     brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic1, Map(
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic1, Map(
       0 -> Seq(100),
       1 -> Seq(101)
     ))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic2, Map(
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic2, Map(
       0 -> Seq(100)
     ))
 
@@ -144,7 +143,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     //Setup: we are only interested in the single partition on broker 101
     brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
     def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, tp.topic, Map(tp.partition -> Seq(101)))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(tp.topic, Map(tp.partition -> Seq(101)))
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 10, acks = -1)
 
     //1. Given a single message

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
new file mode 100644
index 0000000..993dcf2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.util
+import java.util.Properties
+
+import kafka.log._
+import kafka.server.DynamicConfig.Broker._
+import kafka.server.KafkaConfig._
+import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
+import kafka.utils.CoreUtils._
+import kafka.utils.TestUtils._
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
+import org.apache.kafka.common.metrics.Quota
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.{After, Test}
+
+import scala.collection.JavaConverters._
+import scala.collection.{Map, immutable}
+
+class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
+
+  var servers: Seq[KafkaServer] = Seq()
+
+  @After
+  override def tearDown() {
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+  }
+
+  @Test
+  def testManualReplicaAssignment() {
+    val brokers = List(0, 1, 2, 3, 4)
+    TestUtils.createBrokersInZk(zkUtils, brokers)
+
+    // duplicate brokers
+    intercept[InvalidReplicaAssignmentException] {
+      adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK("test", Map(0->Seq(0,0)))
+    }
+
+    // inconsistent replication factor
+    intercept[InvalidReplicaAssignmentException] {
+      adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK("test", Map(0->Seq(0,1), 1->Seq(0)))
+    }
+
+    // good assignment
+    val assignment = Map(0 -> List(0, 1, 2),
+                         1 -> List(1, 2, 3))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK("test", assignment)
+    val found = zkClient.getPartitionAssignmentForTopics(Set("test"))
+    assertEquals(assignment, found("test"))
+  }
+
+  @Test
+  def testTopicCreationInZK() {
+    val expectedReplicaAssignment = Map(
+      0  -> List(0, 1, 2),
+      1  -> List(1, 2, 3),
+      2  -> List(2, 3, 4),
+      3  -> List(3, 4, 0),
+      4  -> List(4, 0, 1),
+      5  -> List(0, 2, 3),
+      6  -> List(1, 3, 4),
+      7  -> List(2, 4, 0),
+      8  -> List(3, 0, 1),
+      9  -> List(4, 1, 2),
+      10 -> List(1, 2, 3),
+      11 -> List(1, 3, 4)
+    )
+    val leaderForPartitionMap = immutable.Map(
+      0 -> 0,
+      1 -> 1,
+      2 -> 2,
+      3 -> 3,
+      4 -> 4,
+      5 -> 0,
+      6 -> 1,
+      7 -> 2,
+      8 -> 3,
+      9 -> 4,
+      10 -> 1,
+      11 -> 1
+    )
+    val topic = "test"
+    TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4))
+    // create the topic
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+    // create leaders for all partitions
+    TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
+    val actualReplicaMap = leaderForPartitionMap.keys.map(p => p -> zkClient.getReplicasForPartition(new TopicPartition(topic, p))).toMap
+    assertEquals(expectedReplicaAssignment.size, actualReplicaMap.size)
+    for(i <- 0 until actualReplicaMap.size)
+      assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaMap(i))
+
+    intercept[TopicExistsException] {
+      // shouldn't be able to create a topic that already exists
+      adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+    }
+  }
+
+  @Test
+  def testTopicCreationWithCollision() {
+    val topic = "test.topic"
+    val collidingTopic = "test_topic"
+    TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4))
+    // create the topic
+    adminZkClient.createTopic(topic, 3, 1)
+
+    intercept[InvalidTopicException] {
+      // shouldn't be able to create a topic that collides
+      adminZkClient.createTopic(collidingTopic, 3, 1)
+    }
+  }
+
+  @Test
+  def testConcurrentTopicCreation() {
+    val topic = "test.topic"
+
+    // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
+    val zkMock = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    EasyMock.expect(zkMock.topicExists(topic)).andReturn(false)
+    EasyMock.expect(zkMock.getAllTopicsInCluster).andReturn(Seq("some.topic", topic, "some.other.topic"))
+    EasyMock.replay(zkMock)
+    val adminZkClient = new AdminZkClient(zkMock)
+
+    intercept[TopicExistsException] {
+      adminZkClient.validateCreateOrUpdateTopic(topic, Map.empty, new Properties, update = false)
+    }
+  }
+
+  /**
+   * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
+   * then changes the config and checks that the new values take effect.
+   */
+  @Test
+  def testTopicConfigChange() {
+    val partitions = 3
+    val topic = "my-topic"
+    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+    servers = Seq(server)
+
+    def makeConfig(messageSize: Int, retentionMs: Long, throttledLeaders: String, throttledFollowers: String) = {
+      val props = new Properties()
+      props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
+      props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString)
+      props.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, throttledLeaders)
+      props.setProperty(LogConfig.FollowerReplicationThrottledReplicasProp, throttledFollowers)
+      props
+    }
+
+    def checkConfig(messageSize: Int, retentionMs: Long, throttledLeaders: String, throttledFollowers: String, quotaManagerIsThrottled: Boolean) {
+      def checkList(actual: util.List[String], expected: String): Unit = {
+        assertNotNull(actual)
+        if (expected == "")
+          assertTrue(actual.isEmpty)
+        else
+          assertEquals(expected.split(",").toSeq, actual.asScala)
+      }
+      TestUtils.retry(10000) {
+        for (part <- 0 until partitions) {
+          val tp = new TopicPartition(topic, part)
+          val log = server.logManager.getLog(tp)
+          assertTrue(log.isDefined)
+          assertEquals(retentionMs, log.get.config.retentionMs)
+          assertEquals(messageSize, log.get.config.maxMessageSize)
+          checkList(log.get.config.LeaderReplicationThrottledReplicas, throttledLeaders)
+          checkList(log.get.config.FollowerReplicationThrottledReplicas, throttledFollowers)
+          assertEquals(quotaManagerIsThrottled, server.quotaManagers.leader.isThrottled(tp))
+        }
+      }
+    }
+
+    // create a topic with a few config overrides and check that they are applied
+    val maxMessageSize = 1024
+    val retentionMs = 1000 * 1000
+    adminZkClient.createTopic(topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+
+    //Standard topic configs will be propagated at topic creation time, but the quota manager will not have been updated.
+    checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", false)
+
+    //Update dynamically and all properties should be applied
+    adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+
+    checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", true)
+
+    // now double the config values for the topic and check that it is applied
+    val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
+    adminZkClient.changeTopicConfig(topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*"))
+    checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", quotaManagerIsThrottled = true)
+
+    // Verify that the same config can be read from ZK
+    val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+    assertEquals(newConfig, configInZk)
+
+    //Now delete the config
+    adminZkClient.changeTopicConfig(topic, new Properties)
+    checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
+
+    //Add config back
+    adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+    checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled = true)
+
+    //Now ensure updating to "" removes the throttled replica list also
+    adminZkClient.changeTopicConfig(topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), (LogConfig.LeaderReplicationThrottledReplicasProp, "")))
+    checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",  quotaManagerIsThrottled = false)
+  }
+
+  @Test
+  def shouldPropagateDynamicBrokerConfigs() {
+    val brokerIds = Seq(0, 1, 2)
+    servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_))
+
+    def checkConfig(limit: Long) {
+      retry(10000) {
+        for (server <- servers) {
+          assertEquals("Leader Quota Manager was not updated", limit, server.quotaManagers.leader.upperBound)
+          assertEquals("Follower Quota Manager was not updated", limit, server.quotaManagers.follower.upperBound)
+        }
+      }
+    }
+
+    val limit: Long = 1000000
+
+    // Set the limit & check it is applied to the log
+    adminZkClient.changeBrokerConfig(brokerIds, propsWith(
+      (LeaderReplicationThrottledRateProp, limit.toString),
+      (FollowerReplicationThrottledRateProp, limit.toString)))
+    checkConfig(limit)
+
+    // Now double the config values for the topic and check that it is applied
+    val newLimit = 2 * limit
+    adminZkClient.changeBrokerConfig(brokerIds,  propsWith(
+      (LeaderReplicationThrottledRateProp, newLimit.toString),
+      (FollowerReplicationThrottledRateProp, newLimit.toString)))
+    checkConfig(newLimit)
+
+    // Verify that the same config can be read from ZK
+    for (brokerId <- brokerIds) {
+      val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
+      assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
+      assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
+    }
+
+    //Now delete the config
+    adminZkClient.changeBrokerConfig(brokerIds, new Properties)
+    checkConfig(DefaultReplicationThrottledRate)
+  }
+
+  /**
+   * This test simulates a client config change in ZK whose notification has been purged.
+   * Basically, it asserts that notifications are bootstrapped from ZK
+   */
+  @Test
+  def testBootstrapClientIdConfig() {
+    val clientId = "my-client"
+    val props = new Properties()
+    props.setProperty("producer_byte_rate", "1000")
+    props.setProperty("consumer_byte_rate", "2000")
+
+    // Write config without notification to ZK.
+    zkClient.setOrCreateEntityConfigs(ConfigType.Client, clientId, props)
+
+    val configInZk: Map[String, Properties] = adminZkClient.fetchAllEntityConfigs(ConfigType.Client)
+    assertEquals("Must have 1 overriden client config", 1, configInZk.size)
+    assertEquals(props, configInZk(clientId))
+
+    // Test that the existing clientId overrides are read
+    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+    servers = Seq(server)
+    assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+  }
+
+  @Test
+  def testGetBrokerMetadatas() {
+    // broker 4 has no rack information
+    val brokerList = 0 to 5
+    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 5 -> "rack3")
+    val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet))
+    TestUtils.createBrokersInZk(brokerMetadatas, zkUtils)
+
+    val processedMetadatas1 = adminZkClient.getBrokerMetadatas(RackAwareMode.Disabled)
+    assertEquals(brokerList, processedMetadatas1.map(_.id))
+    assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack))
+
+    val processedMetadatas2 = adminZkClient.getBrokerMetadatas(RackAwareMode.Safe)
+    assertEquals(brokerList, processedMetadatas2.map(_.id))
+    assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack))
+
+    intercept[AdminOperationException] {
+      adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced)
+    }
+
+    val partialList = List(0, 1, 2, 3, 5)
+    val processedMetadatas3 = adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced, Some(partialList))
+    assertEquals(partialList, processedMetadatas3.map(_.id))
+    assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack))
+
+    val numPartitions = 3
+    adminZkClient.createTopic("foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe)
+    val assignment = zkClient.getReplicaAssignmentForTopics(Set("foo"))
+    assertEquals(numPartitions, assignment.size)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 347569d..28d8430 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,9 +16,11 @@
 */
 package kafka.zk
 
-import java.util.UUID
+import java.util.{Properties, UUID}
 
+import kafka.log.LogConfig
 import kafka.security.auth._
+import kafka.server.ConfigType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
@@ -70,30 +72,57 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     zkClient.createRecursive("/create/some/random/long/path")
     assertTrue(zkClient.pathExists("/create/some/random/long/path"))
-    zkClient.createRecursive("/create/some/random/long/path") // no errors if path already exists
+    zkClient.createRecursive("/create/some/random/long/path", throwIfPathExists = false) // no errors if path already exists
 
     intercept[IllegalArgumentException](zkClient.createRecursive("create-invalid-path"))
   }
 
   @Test
-  def testGetTopicPartitionCount() {
-    val topic = "mytest"
+  def testTopicAssignmentMethods() {
+    val topic1 = "topic1"
+    val topic2 = "topic2"
 
     // test with non-existing topic
-    assertTrue(zkClient.getTopicPartitionCount(topic).isEmpty)
-
-    // create a topic path
-    zkClient.createRecursive(TopicZNode.path(topic))
+    assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
+    assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
+    assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
+    assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
 
     val assignment = Map(
-      new TopicPartition(topic, 0) -> Seq(0, 1),
-      new TopicPartition(topic, 1) -> Seq(0, 1)
+      new TopicPartition(topic1, 0) -> Seq(0, 1),
+      new TopicPartition(topic1, 1) -> Seq(0, 1),
+      new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
     )
-    zkClient.setTopicAssignmentRaw(topic, assignment)
 
-    assertEquals(2, zkClient.getTopicPartitionCount(topic).get)
-  }
+    // create a topic assignment
+    zkClient.createTopicAssignment(topic1, assignment)
+
+    val expectedAssignment = assignment map { topicAssignment =>
+      val partition = topicAssignment._1.partition
+      val assignment = topicAssignment._2
+      partition -> assignment
+    }
+
+    assertEquals(assignment.size, zkClient.getTopicPartitionCount(topic1).get)
+    assertEquals(expectedAssignment, zkClient.getPartitionAssignmentForTopics(Set(topic1)).get(topic1).get)
+    assertEquals(Set(0, 1, 2), zkClient.getPartitionsForTopics(Set(topic1)).get(topic1).get.toSet)
+    assertEquals(Set(1, 2, 3), zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).toSet)
+
+    val updatedAssignment = assignment - new TopicPartition(topic1, 2)
+
+    zkClient.setTopicAssignment(topic1, updatedAssignment)
+    assertEquals(updatedAssignment.size, zkClient.getTopicPartitionCount(topic1).get)
+
+    // add second topic
+    val secondAssignment = Map(
+      new TopicPartition(topic2, 0) -> Seq(0, 1),
+      new TopicPartition(topic2, 1) -> Seq(0, 1)
+    )
+
+    zkClient.createTopicAssignment(topic2, secondAssignment)
 
+    assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+  }
 
   @Test
   def testGetDataAndVersion() {
@@ -163,6 +192,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     zkClient.deletePartitionReassignment()
     assertEquals(Map.empty, zkClient.getPartitionReassignment)
+
+    zkClient.createPartitionReassignment(reassignment)
+    assertEquals(reassignment, zkClient.getPartitionReassignment)
   }
 
   @Test
@@ -276,6 +308,49 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     zkClient.deleteAclChangeNotifications()
     assertTrue(zkClient.getChildren(AclChangeNotificationZNode.path).isEmpty)
+  }
+
+  @Test
+  def testDeleteTopicPathMethods() {
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+
+    assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
+    assertTrue(zkClient.getTopicDeletions.isEmpty)
+
+    zkClient.createDeleteTopicPath(topic1)
+    zkClient.createDeleteTopicPath(topic2)
+
+    assertTrue(zkClient.isTopicMarkedForDeletion(topic1))
+    assertEquals(Set(topic1, topic2), zkClient.getTopicDeletions.toSet)
+
+    zkClient.deleteTopicDeletions(Seq(topic1, topic2))
+    assertTrue(zkClient.getTopicDeletions.isEmpty)
+  }
+
+  @Test
+  def testEntityConfigManagementMethods() {
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+
+    assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
+
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, "1024")
+    logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+    assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
+
+    logProps.remove(LogConfig.CleanupPolicyProp)
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+    assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps)
+    assertEquals(Set(topic1, topic2), zkClient.getAllEntitiesWithConfig(ConfigType.Topic).toSet)
 
+    zkClient.deleteTopicConfigs(Seq(topic1, topic2))
+    assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index cc1b9c1..4966b10 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -46,6 +46,8 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   var zkUtils: ZkUtils = null
   var zooKeeperClient: ZooKeeperClient = null
   var zkClient: KafkaZkClient = null
+  var adminZkClient: AdminZkClient = null
+
   var zookeeper: EmbeddedZookeeper = null
 
   def zkPort: Int = zookeeper.port
@@ -58,6 +60,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
     zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests)
     zkClient = new KafkaZkClient(zooKeeperClient, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
+    adminZkClient = new AdminZkClient(zkClient)
   }
 
   @After


[3/3] kafka git commit: KAFKA-5646; Use KafkaZkClient in DynamicConfigManager and AdminManager

Posted by ju...@apache.org.
KAFKA-5646; Use KafkaZkClient in DynamicConfigManager and AdminManager

* Add AdminZkClient class
* Use KafkaZkClient, AdminZkClient  in ConfigCommand, TopicCommand
* All the existing tests should work

Author: Manikumar Reddy <ma...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #4194 from omkreddy/KAFKA-5646-ZK-ADMIN-UTILS-DYNAMIC-MANAGER


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc852baf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc852baf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc852baf

Branch: refs/heads/trunk
Commit: bc852baffbf602ead9cb719a01747de414940d53
Parents: c5f31fe
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Wed Nov 22 13:25:52 2017 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 22 13:25:52 2017 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  15 +
 .../main/scala/kafka/admin/ConfigCommand.scala  |  37 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  86 ++--
 .../main/scala/kafka/server/AdminManager.scala  |  32 +-
 .../kafka/server/DynamicConfigManager.scala     |  16 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  10 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   6 +-
 .../src/main/scala/kafka/zk/AdminZkClient.scala | 417 +++++++++++++++++++
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 339 ++++++++++++---
 core/src/main/scala/kafka/zk/ZkData.scala       |  27 +-
 .../ReassignPartitionsIntegrationTest.scala     |   2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |   5 +-
 .../kafka/api/BaseProducerSendTest.scala        |   3 +-
 .../kafka/api/ClientIdQuotaTest.scala           |   3 +-
 .../api/RackAwareAutoTopicCreationTest.scala    |   4 +-
 .../kafka/api/UserClientIdQuotaTest.scala       |   9 +-
 .../integration/kafka/api/UserQuotaTest.scala   |   5 +-
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   3 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  12 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    |  78 ++--
 .../unit/kafka/admin/DeleteTopicTest.scala      |  38 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala |   4 +-
 .../kafka/admin/ListConsumerGroupTest.scala     |   3 +-
 .../admin/ResetConsumerGroupOffsetTest.scala    |  80 ++--
 .../unit/kafka/admin/TopicCommandTest.scala     |  44 +-
 .../controller/ControllerFailoverTest.scala     |   3 +-
 ...MetricsDuringTopicCreationDeletionTest.scala |   5 +-
 .../kafka/integration/TopicMetadataTest.scala   |   3 +-
 .../integration/UncleanLeaderElectionTest.scala |  11 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala  |   9 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |   8 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  24 +-
 .../unit/kafka/server/DynamicConfigTest.scala   |  29 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |   1 -
 .../scala/unit/kafka/server/LogOffsetTest.scala |   9 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |   4 +-
 .../kafka/server/ReplicationQuotasTest.scala    |  16 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |   5 +-
 ...rivenReplicationProtocolAcceptanceTest.scala |   9 +-
 .../epoch/LeaderEpochIntegrationTest.scala      |   7 +-
 .../scala/unit/kafka/zk/AdminZkClientTest.scala | 323 ++++++++++++++
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 101 ++++-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |   3 +
 43 files changed, 1459 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 32cab2a..09a65af 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -32,6 +32,7 @@ import scala.collection.JavaConverters._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.kafka.common.internals.Topic
 
+@deprecated("This class is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
 trait AdminUtilities {
   def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
   def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
@@ -267,6 +268,7 @@ object AdminUtils extends Logging with AdminUtilities {
   * @param validateOnly If true, validate the parameters without actually adding the partitions
   * @return the updated replica assignment
   */
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def addPartitions(zkUtils: ZkUtils,
                     topic: String,
                     existingAssignment: Map[Int, Seq[Int]],
@@ -359,6 +361,7 @@ object AdminUtils extends Logging with AdminUtilities {
     }
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def deleteTopic(zkUtils: ZkUtils, topic: String) {
       if (topicExists(zkUtils, topic)) {
         try {
@@ -434,6 +437,7 @@ object AdminUtils extends Logging with AdminUtilities {
   def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
     zkUtils.pathExists(getTopicPath(topic))
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
                          brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
     val allBrokers = zkUtils.getAllBrokersInCluster()
@@ -452,6 +456,7 @@ object AdminUtils extends Logging with AdminUtilities {
     brokerMetadatas.sortBy(_.id)
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def createTopic(zkUtils: ZkUtils,
                   topic: String,
                   partitions: Int,
@@ -463,6 +468,7 @@ object AdminUtils extends Logging with AdminUtilities {
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def validateCreateOrUpdateTopic(zkUtils: ZkUtils,
                                   topic: String,
                                   partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -501,6 +507,7 @@ object AdminUtils extends Logging with AdminUtilities {
       LogConfig.validate(config)
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
                                                      topic: String,
                                                      partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -548,6 +555,7 @@ object AdminUtils extends Logging with AdminUtilities {
    *                 existing configs need to be deleted, it should be done prior to invoking this API
    *
    */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) {
     DynamicConfig.Client.validate(configs)
     changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs)
@@ -564,6 +572,7 @@ object AdminUtils extends Logging with AdminUtilities {
    *                 existing configs need to be deleted, it should be done prior to invoking this API
    *
    */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
     if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
       DynamicConfig.Client.validate(configs)
@@ -589,6 +598,7 @@ object AdminUtils extends Logging with AdminUtilities {
    *                 existing configs need to be deleted, it should be done prior to invoking this API
    *
    */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
     validateTopicConfig(zkUtils, topic, configs)
     changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
@@ -602,6 +612,7 @@ object AdminUtils extends Logging with AdminUtilities {
     * @param brokers: The list of brokers to apply config changes to
     * @param configs: The config to change, as properties
     */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit = {
     DynamicConfig.Broker.validate(configs)
     brokers.foreach { broker =>
@@ -637,6 +648,7 @@ object AdminUtils extends Logging with AdminUtilities {
    * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
    * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
    */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = {
     val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName)
     // readDataMaybeNull returns Some(null) if the path exists, but there is no data
@@ -657,12 +669,15 @@ object AdminUtils extends Logging with AdminUtilities {
     props
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
     zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
     zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
     def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
       val root = rootPath match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index febf40f..077ecce 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -24,8 +24,10 @@ import kafka.common.Config
 import kafka.common.InvalidConfigException
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils.{CommandLineUtils, ZkUtils}
+import kafka.utils.CommandLineUtils
 import kafka.utils.Implicits._
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
 import org.apache.kafka.common.utils.{Sanitizer, Utils}
@@ -61,26 +63,25 @@ object ConfigCommand extends Config {
 
     opts.checkArgs()
 
-    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
-                          30000,
-                          30000,
-                          JaasUtils.isZkSecurityEnabled())
+    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue)
+    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+    val adminZkClient = new AdminZkClient(zkClient)
 
     try {
       if (opts.options.has(opts.alterOpt))
-        alterConfig(zkUtils, opts)
+        alterConfig(zkClient, opts, adminZkClient)
       else if (opts.options.has(opts.describeOpt))
-        describeConfig(zkUtils, opts)
+        describeConfig(zkClient, opts, adminZkClient)
     } catch {
       case e: Throwable =>
         println("Error while executing config command " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
-      zkUtils.close()
+      zkClient.close()
     }
   }
 
-  private[admin] def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: AdminUtilities = AdminUtils) {
+  private[admin] def alterConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
     val configsToBeAdded = parseConfigsToBeAdded(opts)
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
     val entity = parseEntity(opts)
@@ -91,7 +92,7 @@ object ConfigCommand extends Config {
       preProcessScramCredentials(configsToBeAdded)
 
     // compile the final set of configs
-    val configs = utils.fetchEntityConfig(zkUtils, entityType, entityName)
+    val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
 
     // fail the command if any of the configs to be deleted does not exist
     val invalidConfigs = configsToBeDeleted.filterNot(configs.containsKey(_))
@@ -101,7 +102,7 @@ object ConfigCommand extends Config {
     configs ++= configsToBeAdded
     configsToBeDeleted.foreach(configs.remove(_))
 
-    utils.changeConfigs(zkUtils, entityType, entityName, configs)
+    adminZkClient.changeConfigs(entityType, entityName, configs)
 
     println(s"Completed Updating config for entity: $entity.")
   }
@@ -127,12 +128,12 @@ object ConfigCommand extends Config {
     }
   }
 
-  private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
+  private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
     val configEntity = parseEntity(opts)
     val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
-    val entities = configEntity.getAllEntities(zkUtils)
+    val entities = configEntity.getAllEntities(zkClient)
     for (entity <- entities) {
-      val configs = AdminUtils.fetchEntityConfig(zkUtils, entity.root.entityType, entity.fullSanitizedName)
+      val configs = adminZkClient.fetchEntityConfig(entity.root.entityType, entity.fullSanitizedName)
       // When describing all users, don't include empty user nodes with only <user, client> quota overrides.
       if (!configs.isEmpty || !describeAllUsers) {
         println("Configs for %s are %s"
@@ -196,7 +197,7 @@ object ConfigCommand extends Config {
   case class ConfigEntity(root: Entity, child: Option[Entity]) {
     val fullSanitizedName = root.sanitizedName.getOrElse("") + child.map(s => "/" + s.entityPath).getOrElse("")
 
-    def getAllEntities(zkUtils: ZkUtils) : Seq[ConfigEntity] = {
+    def getAllEntities(zkClient: KafkaZkClient) : Seq[ConfigEntity] = {
       // Describe option examples:
       //   Describe entity with specified name:
       //     --entity-type topics --entity-name topic1 (topic1)
@@ -211,19 +212,19 @@ object ConfigCommand extends Config {
       //     --entity-type users --entity-default --entity-type clients --entity-default (Default <user, client>)
       (root.sanitizedName, child) match {
         case (None, _) =>
-          val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
+          val rootEntities = zkClient.getAllEntitiesWithConfig(root.entityType)
                                    .map(name => ConfigEntity(Entity(root.entityType, Some(name)), child))
           child match {
             case Some(s) =>
                 rootEntities.flatMap(rootEntity =>
-                  ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils))
+                  ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkClient))
             case None => rootEntities
           }
         case (_, Some(childEntity)) =>
           childEntity.sanitizedName match {
             case Some(_) => Seq(this)
             case None =>
-                zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
+                zkClient.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
                        .map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name)))))
 
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f2a74a0..bdd8aaf 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -25,18 +25,19 @@ import kafka.utils.Implicits._
 import kafka.consumer.Whitelist
 import kafka.log.LogConfig
 import kafka.server.ConfigType
-import kafka.utils.ZkUtils._
 import kafka.utils._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
+import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConverters._
 import scala.collection._
 
-
 object TopicCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
@@ -53,36 +54,35 @@ object TopicCommand extends Logging {
 
     opts.checkArgs()
 
-    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
-                          30000,
-                          30000,
-                          JaasUtils.isZkSecurityEnabled())
+    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue)
+    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+
     var exitCode = 0
     try {
       if(opts.options.has(opts.createOpt))
-        createTopic(zkUtils, opts)
+        createTopic(zkClient, opts)
       else if(opts.options.has(opts.alterOpt))
-        alterTopic(zkUtils, opts)
+        alterTopic(zkClient, opts)
       else if(opts.options.has(opts.listOpt))
-        listTopics(zkUtils, opts)
+        listTopics(zkClient, opts)
       else if(opts.options.has(opts.describeOpt))
-        describeTopic(zkUtils, opts)
+        describeTopic(zkClient, opts)
       else if(opts.options.has(opts.deleteOpt))
-        deleteTopic(zkUtils, opts)
+        deleteTopic(zkClient, opts)
     } catch {
       case e: Throwable =>
         println("Error while executing topic command : " + e.getMessage)
         error(Utils.stackTrace(e))
         exitCode = 1
     } finally {
-      zkUtils.close()
+      zkClient.close()
       Exit.exit(exitCode)
     }
 
   }
 
-  private def getTopics(zkUtils: ZkUtils, opts: TopicCommandOptions): Seq[String] = {
-    val allTopics = zkUtils.getAllTopics().sorted
+  private def getTopics(zkClient: KafkaZkClient, opts: TopicCommandOptions): Seq[String] = {
+    val allTopics = zkClient.getAllTopicsInCluster.sorted
     if (opts.options.has(opts.topicOpt)) {
       val topicsSpec = opts.options.valueOf(opts.topicOpt)
       val topicsFilter = new Whitelist(topicsSpec)
@@ -91,23 +91,24 @@ object TopicCommand extends Logging {
       allTopics
   }
 
-  def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+  def createTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
     val topic = opts.options.valueOf(opts.topicOpt)
     val configs = parseTopicConfigsToBeAdded(opts)
     val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
     if (Topic.hasCollisionChars(topic))
       println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
+    val adminZkClient = new AdminZkClient(zkClient)
     try {
       if (opts.options.has(opts.replicaAssignmentOpt)) {
         val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
+        adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignment, configs, update = false)
       } else {
         CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
         val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
         val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
                             else RackAwareMode.Enforced
-        AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
+        adminZkClient.createTopic(topic, partitions, replicas, configs, rackAwareMode)
       }
       println("Created topic \"%s\".".format(topic))
     } catch  {
@@ -115,15 +116,16 @@ object TopicCommand extends Logging {
     }
   }
 
-  def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
-    val topics = getTopics(zkUtils, opts)
+  def alterTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+    val topics = getTopics(zkClient, opts)
     val ifExists = opts.options.has(opts.ifExistsOpt)
     if (topics.isEmpty && !ifExists) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
           opts.options.valueOf(opts.zkConnectOpt)))
     }
+    val adminZkClient = new AdminZkClient(zkClient)
     topics.foreach { topic =>
-      val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+      val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
       if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
         println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
         println("         Going forward, please use kafka-configs.sh for this functionality")
@@ -133,7 +135,7 @@ object TopicCommand extends Logging {
         // compile the final set of configs
         configs ++= configsToBeAdded
         configsToBeDeleted.foreach(config => configs.remove(config))
-        AdminUtils.changeTopicConfig(zkUtils, topic, configs)
+        adminZkClient.changeTopicConfig(topic, configs)
         println("Updated config for topic \"%s\".".format(topic))
       }
 
@@ -144,7 +146,7 @@ object TopicCommand extends Logging {
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +
           "logic or ordering of the messages will be affected")
         val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
-        val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+        val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map {
           case (topicPartition, replicas) => topicPartition.partition -> replicas
         }
         if (existingAssignment.isEmpty)
@@ -155,17 +157,17 @@ object TopicCommand extends Logging {
           val partitionList = replicaAssignmentString.split(",").drop(startPartitionId)
           AdminUtils.parseReplicaAssignment(partitionList.mkString(","), startPartitionId)
         }
-        val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
-        AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers, nPartitions, newAssignment)
+        val allBrokers = adminZkClient.getBrokerMetadatas()
+        adminZkClient.addPartitions(topic, existingAssignment, allBrokers, nPartitions, newAssignment)
         println("Adding partitions succeeded!")
       }
     }
   }
 
-  def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
-    val topics = getTopics(zkUtils, opts)
+  def listTopics(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+    val topics = getTopics(zkClient, opts)
     for(topic <- topics) {
-      if (zkUtils.isTopicMarkedForDeletion(topic)) {
+      if (zkClient.isTopicMarkedForDeletion(topic)) {
         println("%s - marked for deletion".format(topic))
       } else {
         println(topic)
@@ -173,8 +175,8 @@ object TopicCommand extends Logging {
     }
   }
 
-  def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
-    val topics = getTopics(zkUtils, opts)
+  def deleteTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+    val topics = getTopics(zkClient, opts)
     val ifExists = opts.options.has(opts.ifExistsOpt)
     if (topics.isEmpty && !ifExists) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
@@ -185,12 +187,12 @@ object TopicCommand extends Logging {
         if (Topic.isInternal(topic)) {
           throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
         } else {
-          zkUtils.createPersistentPath(getDeleteTopicPath(topic))
+          zkClient.createDeleteTopicPath(topic)
           println("Topic %s is marked for deletion.".format(topic))
           println("Note: This will have no impact if delete.topic.enable is not set to true.")
         }
       } catch {
-        case _: ZkNodeExistsException =>
+        case _: NodeExistsException =>
           println("Topic %s is already marked for deletion.".format(topic))
         case e: AdminOperationException =>
           throw e
@@ -200,21 +202,23 @@ object TopicCommand extends Logging {
     }
   }
 
-  def describeTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
-    val topics = getTopics(zkUtils, opts)
+  def describeTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+    val topics = getTopics(zkClient, opts)
     val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt)
     val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt)
     val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt)
-    val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet
+    val liveBrokers = zkClient.getAllBrokersInCluster.map(_.id).toSet
+    val adminZkClient = new AdminZkClient(zkClient)
+
     for (topic <- topics) {
-      zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic) match {
+       zkClient.getPartitionAssignmentForTopics(immutable.Set(topic)).get(topic) match {
         case Some(topicPartitionAssignment) =>
           val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
           val describePartitions: Boolean = !reportOverriddenConfigs
           val sortedPartitions = topicPartitionAssignment.toSeq.sortBy(_._1)
-          val markedForDeletion = zkUtils.isTopicMarkedForDeletion(topic)
+          val markedForDeletion = zkClient.isTopicMarkedForDeletion(topic)
           if (describeConfigs) {
-            val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
+            val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic).asScala
             if (!reportOverriddenConfigs || configs.nonEmpty) {
               val numPartitions = topicPartitionAssignment.size
               val replicationFactor = topicPartitionAssignment.head._2.size
@@ -226,8 +230,10 @@ object TopicCommand extends Logging {
           }
           if (describePartitions) {
             for ((partitionId, assignedReplicas) <- sortedPartitions) {
-              val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
-              val leader = zkUtils.getLeaderForPartition(topic, partitionId)
+              val leaderIsrEpoch = zkClient.getTopicPartitionState(new TopicPartition(topic, partitionId))
+              val inSyncReplicas = if (leaderIsrEpoch.isEmpty) Seq.empty[Int] else leaderIsrEpoch.get.leaderAndIsr.isr
+              val leader = if (leaderIsrEpoch.isEmpty) None else Option(leaderIsrEpoch.get.leaderAndIsr.leader)
+
               if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
                   (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
                   (reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 935fade..8f69000 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -23,6 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
 import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
@@ -41,11 +42,12 @@ import scala.collection.JavaConverters._
 class AdminManager(val config: KafkaConfig,
                    val metrics: Metrics,
                    val metadataCache: MetadataCache,
-                   val zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
+                   val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
 
   this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
 
   private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
+  private val adminZkClient = new AdminZkClient(zkClient)
 
   private val createTopicPolicy =
     Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
@@ -101,7 +103,7 @@ class AdminManager(val config: KafkaConfig,
 
         createTopicPolicy match {
           case Some(policy) =>
-            AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false)
+            adminZkClient.validateCreateOrUpdateTopic(topic, assignments, configs, update = false)
 
             // Use `null` for unset fields in the public API
             val numPartitions: java.lang.Integer =
@@ -114,13 +116,13 @@ class AdminManager(val config: KafkaConfig,
               arguments.configs))
 
             if (!validateOnly)
-              AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
+              adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignments, configs, update = false)
 
           case None =>
             if (validateOnly)
-              AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false)
+              adminZkClient.validateCreateOrUpdateTopic(topic, assignments, configs, update = false)
             else
-              AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
+              adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignments, configs, update = false)
         }
         CreatePartitionsMetadata(topic, assignments, ApiError.NONE)
       } catch {
@@ -165,7 +167,7 @@ class AdminManager(val config: KafkaConfig,
     // 1. map over topics calling the asynchronous delete
     val metadata = topics.map { topic =>
         try {
-          AdminUtils.deleteTopic(zkUtils, topic)
+          adminZkClient.deleteTopic(topic)
           DeleteTopicMetadata(topic, Errors.NONE)
         } catch {
           case _: TopicAlreadyMarkedForDeletionException =>
@@ -203,8 +205,8 @@ class AdminManager(val config: KafkaConfig,
                        listenerName: ListenerName,
                        callback: Map[String, ApiError] => Unit): Unit = {
 
-    val reassignPartitionsInProgress = zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)
-    val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
+    val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
+    val allBrokers = adminZkClient.getBrokerMetadatas()
     val allBrokerIds = allBrokers.map(_.id)
 
     // 1. map over topics creating assignment and calling AdminUtils
@@ -215,7 +217,7 @@ class AdminManager(val config: KafkaConfig,
         if (reassignPartitionsInProgress)
           throw new ReassignmentInProgressException("A partition reassignment is in progress.")
 
-        val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+        val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map {
           case (topicPartition, replicas) => topicPartition.partition -> replicas
         }
         if (existingAssignment.isEmpty)
@@ -247,7 +249,7 @@ class AdminManager(val config: KafkaConfig,
           }.toMap
         }
 
-        val updatedReplicaAssignment = AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers,
+        val updatedReplicaAssignment = adminZkClient.addPartitions(topic, existingAssignment, allBrokers,
           newPartition.totalCount, reassignment, validateOnly = validateOnly)
         CreatePartitionsMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
       } catch {
@@ -306,7 +308,7 @@ class AdminManager(val config: KafkaConfig,
             val topic = resource.name
             Topic.validate(topic)
             // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
-            val topicProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+            val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
             val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
             createResponseConfig(logConfig, isReadOnly = false, name => !topicProps.containsKey(name))
 
@@ -350,19 +352,19 @@ class AdminManager(val config: KafkaConfig,
 
             alterConfigPolicy match {
               case Some(policy) =>
-                AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+                adminZkClient.validateTopicConfig(topic, properties)
 
                 val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
                 policy.validate(new AlterConfigPolicy.RequestMetadata(
                   new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava))
 
                 if (!validateOnly)
-                  AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+                  adminZkClient.changeTopicConfig(topic, properties)
               case None =>
                 if (validateOnly)
-                  AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+                  adminZkClient.validateTopicConfig(topic, properties)
                 else
-                  AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+                  adminZkClient.changeTopicConfig(topic, properties)
             }
             resource -> ApiError.NONE
           case resourceType =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 6392723..457742d 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -26,7 +26,7 @@ import scala.collection._
 import scala.collection.JavaConverters._
 import kafka.admin.AdminUtils
 import kafka.utils.json.JsonObject
-import kafka.zk.KafkaZkClient
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.security.scram.ScramMechanism
 import org.apache.kafka.common.utils.Time
@@ -84,11 +84,11 @@ object ConfigEntityName {
  * on startup where a change might be missed between the initial config load and registering for change notifications.
  *
  */
-class DynamicConfigManager(private val oldZkUtils: ZkUtils,
-                           private val zkClient: KafkaZkClient,
+class DynamicConfigManager(private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, ConfigHandler],
                            private val changeExpirationMs: Long = 15*60*1000,
                            private val time: Time = Time.SYSTEM) extends Logging {
+  val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(json: String) = {
@@ -120,7 +120,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
         throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
       }
 
-      val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, entityType, entity)
+      val entityConfig = adminZkClient.fetchEntityConfig(entityType, entity)
       info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
       configHandlers(entityType).processConfigChanges(entity, entityConfig)
 
@@ -141,7 +141,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
       }
       val fullSanitizedEntityName = entityPath.substring(index + 1)
 
-      val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, rootEntityType, fullSanitizedEntityName)
+      val entityConfig = adminZkClient.fetchEntityConfig(rootEntityType, fullSanitizedEntityName)
       val loggableConfig = entityConfig.asScala.map {
         case (k, v) => (k, if (ScramMechanism.isScram(k)) Password.HIDDEN else v)
       }
@@ -163,14 +163,14 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
     // Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides
     configHandlers.foreach {
       case (ConfigType.User, handler) =>
-        AdminUtils.fetchAllEntityConfigs(oldZkUtils, ConfigType.User).foreach {
+        adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach {
           case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties)
         }
-        AdminUtils.fetchAllChildEntityConfigs(oldZkUtils, ConfigType.User, ConfigType.Client).foreach {
+        adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach {
           case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties)
         }
       case (configType, handler) =>
-        AdminUtils.fetchAllEntityConfigs(oldZkUtils, configType).foreach {
+        adminZkClient.fetchAllEntityConfigs(configType).foreach {
           case (entityName, properties) => handler.processConfigChanges(entityName, properties)
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index de56986..a31b6c3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -29,7 +29,7 @@ import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.cluster.Partition
 import kafka.common.{OffsetAndMetadata, OffsetMetadata}
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.controller.{KafkaController}
+import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
@@ -37,8 +37,8 @@ import kafka.network.RequestChannel
 import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction}
 import kafka.security.SecurityUtils
 import kafka.security.auth.{Resource, _}
-import kafka.utils.{CoreUtils, Logging, ZkUtils}
-import kafka.zk.KafkaZkClient
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
@@ -71,7 +71,6 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val groupCoordinator: GroupCoordinator,
                 val txnCoordinator: TransactionCoordinator,
                 val controller: KafkaController,
-                val zkUtils: ZkUtils,
                 val zkClient: KafkaZkClient,
                 val brokerId: Int,
                 val config: KafkaConfig,
@@ -84,6 +83,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 time: Time) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
+  val adminZkClient = new AdminZkClient(zkClient)
 
   def close() {
     info("Shutdown complete.")
@@ -829,7 +829,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                           replicationFactor: Int,
                           properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
     try {
-      AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
+      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
       info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
         .format(topic, numPartitions, replicationFactor))
       new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1812eb0..7f61479 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -242,7 +242,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         kafkaController = new KafkaController(config, zkClient, time, metrics, threadNamePrefix)
         kafkaController.startup()
 
-        adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
+        adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
 
         /* start group coordinator */
         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
@@ -263,7 +263,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
-          kafkaController, zkUtils, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+          kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
           brokerTopicStats, clusterId, time)
 
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
@@ -278,7 +278,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
                                                            ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
         // Create the config manager. start listening to notifications
-        dynamicConfigManager = new DynamicConfigManager(zkUtils, zkClient, dynamicConfigHandlers)
+        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
         /* tell everyone we are alive */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/AdminZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
new file mode 100644
index 0000000..e00b8e6
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import java.util.Properties
+
+import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode}
+import kafka.common.TopicAlreadyMarkedForDeletionException
+import kafka.log.LogConfig
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
+import org.apache.zookeeper.KeeperException.NodeExistsException
+
+import scala.collection.{Map, Seq}
+
+class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Creates the topic with given configuration
+   * @param topic topic name to create
+   * @param partitions  Number of partitions to be set
+   * @param replicationFactor Replication factor
+   * @param topicConfig  topic configs
+   * @param rackAwareMode
+   */
+  def createTopic(topic: String,
+                  partitions: Int,
+                  replicationFactor: Int,
+                  topicConfig: Properties = new Properties,
+                  rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
+    val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
+    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
+    createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment, topicConfig)
+  }
+
+  /**
+   * Gets broker metadata list
+   * @param rackAwareMode
+   * @param brokerList
+   * @return
+   */
+  def getBrokerMetadatas(rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
+                         brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
+    val allBrokers = zkClient.getAllBrokersInCluster
+    val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
+    val brokersWithRack = brokers.filter(_.rack.nonEmpty)
+    if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
+      throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
+        " to make replica assignment without rack information.")
+    }
+    val brokerMetadatas = rackAwareMode match {
+      case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
+      case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
+        brokers.map(broker => BrokerMetadata(broker.id, None))
+      case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
+    }
+    brokerMetadatas.sortBy(_.id)
+  }
+
+  /**
+   * Creates or Updates the partition assignment for a given topic
+   * @param topic
+   * @param partitionReplicaAssignment
+   * @param config
+   * @param update
+   */
+  def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String,
+                                                     partitionReplicaAssignment: Map[Int, Seq[Int]],
+                                                     config: Properties = new Properties,
+                                                     update: Boolean = false) {
+    validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)
+
+    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+    if (!update) {
+      // write out the config if there is any, this isn't transactional with the partition assignments
+      zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
+    }
+
+    // create the partition assignment
+    writeTopicPartitionAssignment(topic, partitionReplicaAssignment, update)
+  }
+
+  /**
+   * Validate method to use before the topic creation or update
+   * @param topic
+   * @param partitionReplicaAssignment
+   * @param config
+   * @param update
+   */
+  def validateCreateOrUpdateTopic(topic: String,
+                                  partitionReplicaAssignment: Map[Int, Seq[Int]],
+                                  config: Properties,
+                                  update: Boolean): Unit = {
+    // validate arguments
+    Topic.validate(topic)
+
+    if (!update) {
+      if (zkClient.topicExists(topic))
+        throw new TopicExistsException(s"Topic '$topic' already exists.")
+      else if (Topic.hasCollisionChars(topic)) {
+        val allTopics = zkClient.getAllTopicsInCluster
+        // check again in case the topic was created in the meantime, otherwise the
+        // topic could potentially collide with itself
+        if (allTopics.contains(topic))
+          throw new TopicExistsException(s"Topic '$topic' already exists.")
+        val collidingTopics = allTopics.filter(Topic.hasCollision(topic, _))
+        if (collidingTopics.nonEmpty) {
+          throw new InvalidTopicException(s"Topic '$topic' collides with existing topics: ${collidingTopics.mkString(", ")}")
+        }
+      }
+    }
+
+    if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1)
+      throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas")
+
+    partitionReplicaAssignment.values.foreach(reps =>
+      if (reps.size != reps.toSet.size)
+        throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment)
+    )
+
+    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+    if (!update)
+      LogConfig.validate(config)
+  }
+
+  private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
+    try {
+      val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
+
+      if (!update) {
+        info("Topic creation " + assignment)
+        zkClient.createTopicAssignment(topic, assignment)
+      } else {
+        info("Topic update " + assignment)
+        zkClient.setTopicAssignment(topic, assignment)
+      }
+      debug("Updated path %s with %s for replica assignment".format(TopicZNode.path(topic), assignment))
+    } catch {
+      case _: NodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
+      case e2: Throwable => throw new AdminOperationException(e2.toString)
+    }
+  }
+
+
+  /**
+   * Creates a delete path for a given topic
+   * @param topic
+   */
+  def deleteTopic(topic: String) {
+    if (zkClient.topicExists(topic)) {
+      try {
+        zkClient.createDeleteTopicPath(topic)
+      } catch {
+        case _: NodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+          "topic %s is already marked for deletion".format(topic))
+        case e: Throwable => throw new AdminOperationException(e.getMessage)
+       }
+    } else {
+      throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
+    }
+  }
+
+  /**
+  * Add partitions to existing topic with optional replica assignment
+  *
+  * @param topic Topic for adding partitions to
+  * @param existingAssignment A map from partition id to its assigned replicas
+  * @param allBrokers All brokers in the cluster
+  * @param numPartitions Number of partitions to be set
+  * @param replicaAssignment Manual replica assignment, or none
+  * @param validateOnly If true, validate the parameters without actually adding the partitions
+  * @return the updated replica assignment
+  */
+  def addPartitions(topic: String,
+                    existingAssignment: Map[Int, Seq[Int]],
+                    allBrokers: Seq[BrokerMetadata],
+                    numPartitions: Int = 1,
+                    replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
+                    validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
+    val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
+      throw new AdminOperationException(
+        s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
+          s"Assignment: $existingAssignment"))
+
+    val partitionsToAdd = numPartitions - existingAssignment.size
+    if (partitionsToAdd <= 0)
+      throw new InvalidPartitionsException(
+        s"The number of partitions for a topic can only be increased. " +
+          s"Topic $topic currently has ${existingAssignment.size} partitions, " +
+          s"$numPartitions would not be an increase.")
+
+    replicaAssignment.foreach { proposedReplicaAssignment =>
+      validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0.size,
+        allBrokers.map(_.id).toSet)
+    }
+
+    val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
+      val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
+      AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
+        startIndex, existingAssignment.size)
+    }
+    val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions
+    if (!validateOnly) {
+      info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
+        s"$proposedAssignmentForNewPartitions.")
+      // add the combined new list
+      createOrUpdateTopicPartitionAssignmentPathInZK(topic, proposedAssignment, update = true)
+    }
+    proposedAssignment
+
+  }
+
+  private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
+                                        expectedReplicationFactor: Int,
+                                        availableBrokerIds: Set[Int]): Unit = {
+
+    replicaAssignment.foreach { case (partitionId, replicas) =>
+      if (replicas.isEmpty)
+        throw new InvalidReplicaAssignmentException(
+          s"Cannot have replication factor of 0 for partition id $partitionId.")
+      if (replicas.size != replicas.toSet.size)
+        throw new InvalidReplicaAssignmentException(
+          s"Duplicate brokers not allowed in replica assignment: " +
+            s"${replicas.mkString(", ")} for partition id $partitionId.")
+      if (!replicas.toSet.subsetOf(availableBrokerIds))
+        throw new BrokerNotAvailableException(
+          s"Some brokers specified for partition id $partitionId are not available. " +
+            s"Specified brokers: ${replicas.mkString(", ")}, " +
+            s"available brokers: ${availableBrokerIds.mkString(", ")}.")
+      partitionId -> replicas.size
+    }
+    val badRepFactors = replicaAssignment.collect {
+      case (partition, replicas) if replicas.size != expectedReplicationFactor => partition -> replicas.size
+    }
+    if (badRepFactors.nonEmpty) {
+      val sortedBadRepFactors = badRepFactors.toSeq.sortBy { case (partitionId, _) => partitionId }
+      val partitions = sortedBadRepFactors.map { case (partitionId, _) => partitionId }
+      val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
+      throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " +
+        s"partition 0 has ${expectedReplicationFactor} while partitions [${partitions.mkString(", ")}] have " +
+        s"replication factors [${repFactors.mkString(", ")}], respectively.")
+    }
+  }
+
+  /**
+   * Change the configs for a given entityType and entityName
+   * @param entityType
+   * @param entityName
+   * @param configs
+   */
+  def changeConfigs(entityType: String, entityName: String, configs: Properties): Unit = {
+
+    def parseBroker(broker: String): Int = {
+      try broker.toInt
+      catch {
+        case _: NumberFormatException =>
+          throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
+      }
+    }
+
+    entityType match {
+      case ConfigType.Topic => changeTopicConfig(entityName, configs)
+      case ConfigType.Client => changeClientIdConfig(entityName, configs)
+      case ConfigType.User => changeUserOrUserClientIdConfig(entityName, configs)
+      case ConfigType.Broker => changeBrokerConfig(Seq(parseBroker(entityName)), configs)
+      case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
+    }
+  }
+
+  /**
+   * Update the config for a client and create a change notification so the change will propagate to other brokers.
+   * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
+   * and <user> configs are not specified.
+   *
+   * @param sanitizedClientId: The sanitized clientId for which configs are being changed
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+  def changeClientIdConfig(sanitizedClientId: String, configs: Properties) {
+    DynamicConfig.Client.validate(configs)
+    changeEntityConfig(ConfigType.Client, sanitizedClientId, configs)
+  }
+
+  /**
+   * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
+   * User and/or clientId components of the path may be <default>, indicating that the configuration is the default
+   * value to be applied if a more specific override is not configured.
+   *
+   * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+  def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties) {
+    if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
+      DynamicConfig.Client.validate(configs)
+    else
+      DynamicConfig.User.validate(configs)
+    changeEntityConfig(ConfigType.User, sanitizedEntityName, configs)
+  }
+
+  /**
+   * validates the topic configs
+   * @param topic
+   * @param configs
+   */
+  def validateTopicConfig(topic: String, configs: Properties): Unit = {
+    Topic.validate(topic)
+    if (!zkClient.topicExists(topic))
+      throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
+    // remove the topic overrides
+    LogConfig.validate(configs)
+  }
+
+  /**
+   * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+   *
+   * @param topic: The topic for which configs are being changed
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+   def changeTopicConfig(topic: String, configs: Properties): Unit = {
+    validateTopicConfig(topic, configs)
+    changeEntityConfig(ConfigType.Topic, topic, configs)
+  }
+
+  /**
+    * Override the broker config on some set of brokers. These overrides will be persisted between sessions, and will
+    * override any defaults entered in the broker's config files
+    *
+    * @param brokers: The list of brokers to apply config changes to
+    * @param configs: The config to change, as properties
+    */
+  def changeBrokerConfig(brokers: Seq[Int], configs: Properties): Unit = {
+    DynamicConfig.Broker.validate(configs)
+    brokers.foreach { broker => changeEntityConfig(ConfigType.Broker, broker.toString, configs)
+    }
+  }
+
+  private def changeEntityConfig(rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
+    val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
+    zkClient.setOrCreateEntityConfigs(rootEntityType, fullSanitizedEntityName, configs)
+
+    // create the change notification
+    zkClient.createConfigChangeNotification(sanitizedEntityPath)
+  }
+
+  /**
+   * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
+   * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
+   * @param rootEntityType
+   * @param sanitizedEntityName
+   * @return
+   */
+  def fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String): Properties = {
+    zkClient.getEntityConfigs(rootEntityType, sanitizedEntityName)
+  }
+
+  /**
+   * Gets all topic configs
+   * @return
+   */
+  def getAllTopicConfigs(): Map[String, Properties] =
+    zkClient.getAllTopicsInCluster.map(topic => (topic, fetchEntityConfig(ConfigType.Topic, topic))).toMap
+
+  /**
+   * Gets all the entity configs for a given entityType
+   * @param entityType
+   * @return
+   */
+  def fetchAllEntityConfigs(entityType: String): Map[String, Properties] =
+    zkClient.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(entityType, entity))).toMap
+
+  /**
+   * Gets all the entity configs for a given childEntityType
+   * @param rootEntityType
+   * @param childEntityType
+   * @return
+   */
+  def fetchAllChildEntityConfigs(rootEntityType: String, childEntityType: String): Map[String, Properties] = {
+    def entityPaths(rootPath: Option[String]): Seq[String] = {
+      val root = rootPath match {
+        case Some(path) => rootEntityType + '/' + path
+        case None => rootEntityType
+      }
+      val entityNames = zkClient.getAllEntitiesWithConfig(root)
+      rootPath match {
+        case Some(path) => entityNames.map(entityName => path + '/' + entityName)
+        case None => entityNames
+      }
+    }
+    entityPaths(None)
+      .flatMap(entity => entityPaths(Some(entity + '/' + childEntityType)))
+      .map(entityPath => (entityPath, fetchEntityConfig(rootEntityType, entityPath))).toMap
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 24d7ba9..b419654 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
 import kafka.log.LogConfig
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.security.auth.{Acl, Resource, ResourceType}
@@ -33,8 +33,8 @@ import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException}
 
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
 
 /**
  * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -168,7 +168,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
       configResponse.resultCode match {
         case Code.OK =>
           val overrides = ConfigEntityZNode.decode(configResponse.data)
-          val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
+          val logConfig = LogConfig.fromProps(config, overrides)
           logConfigs.put(topic, logConfig)
         case Code.NONODE =>
           val logConfig = LogConfig.fromProps(config, new Properties)
@@ -180,32 +180,100 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Get entity configs for a given entity name
+   * @param rootEntityType entity type
+   * @param sanitizedEntityName entity name
+   * @return The successfully gathered log configs
+   */
+  def getEntityConfigs(rootEntityType: String, sanitizedEntityName: String): Properties = {
+    val getDataRequest = GetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName))
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+
+    getDataResponse.resultCode match {
+      case Code.OK =>
+        ConfigEntityZNode.decode(getDataResponse.data)
+      case Code.NONODE => new Properties()
+      case _ => throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Sets or creates the entity znode path with the given configs depending
+   * on whether it already exists or not.
+   * @param rootEntityType entity type
+   * @param sanitizedEntityName entity name
+   * @throws KeeperException if there is an error while setting or creating the znode
+   */
+  def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties) = {
+
+    def set(configData: Array[Byte]): SetDataResponse = {
+      val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), ConfigEntityZNode.encode(config), ZkVersion.NoVersion)
+      retryRequestUntilConnected(setDataRequest)
+    }
+
+    def create(configData: Array[Byte]) = {
+      val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
+      createRecursive(path, ConfigEntityZNode.encode(config))
+    }
+
+    val configData = ConfigEntityZNode.encode(config)
+
+    val setDataResponse = set(configData)
+    setDataResponse.resultCode match {
+      case Code.NONODE => create(configData)
+      case _ => setDataResponse.resultException.foreach(e => throw e)
+    }
+  }
+
+  /**
+   * Returns all the entities for a given entityType
+   * @param entityType entity type
+   * @return List of all entity names
+   */
+  def getAllEntitiesWithConfig(entityType: String): Seq[String] = {
+    getChildren(ConfigEntityTypeZNode.path(entityType))
+  }
+
+  /**
+   * Creates config change notification
+   * @param sanitizedEntityPath  sanitizedEntityPath path to write
+   * @throws KeeperException if there is an error while setting or creating the znode
+   */
+  def createConfigChangeNotification(sanitizedEntityPath: String): Unit = {
+    val path = ConfigEntityChangeNotificationSequenceZNode.createPath
+    val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createResponse = retryRequestUntilConnected(createRequest)
+    if (createResponse.resultCode != Code.OK) {
+      createResponse.resultException.foreach(e => throw e)
+    }
+  }
+
+  /**
    * Gets all brokers in the cluster.
    * @return sequence of brokers in the cluster.
    */
   def getAllBrokersInCluster: Seq[Broker] = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path))
-    getChildrenResponse.resultCode match {
-      case Code.OK =>
-        val brokerIds = getChildrenResponse.children.map(_.toInt)
-        val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
-        val getDataResponses = retryRequestsUntilConnected(getDataRequests)
-        getDataResponses.flatMap { getDataResponse =>
-          val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
-          getDataResponse.resultCode match {
-            case Code.OK =>
-              Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
-            case Code.NONODE => None
-            case _ => throw getDataResponse.resultException.get
-          }
-        }
-      case Code.NONODE =>
-        Seq.empty
-      case _ =>
-        throw getChildrenResponse.resultException.get
+    val brokerIds = getSortedBrokerList
+    val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
+    val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+    getDataResponses.flatMap { getDataResponse =>
+      val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
+      getDataResponse.resultCode match {
+        case Code.OK =>
+          Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+        case Code.NONODE => None
+        case _ => throw getDataResponse.resultException.get
+      }
     }
   }
 
+
+  /**
+   * Gets the list of sorted broker Ids
+   */
+  def getSortedBrokerList(): Seq[Int] =
+    getChildren(BrokerIdsZNode.path).map(_.toInt).sorted
+
   /**
    * Gets all topics in the cluster.
    * @return sequence of topics in the cluster.
@@ -221,6 +289,15 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Checks the topic existence
+   * @param topicName
+   * @return true if topic exists else false
+   */
+  def topicExists(topicName: String): Boolean = {
+    pathExists(TopicZNode.path(topicName))
+  }
+
+  /**
    * Sets the topic znode with the given assignment.
    * @param topic the topic whose assignment is being set.
    * @param assignment the partition to replica mapping to set for the given topic
@@ -232,6 +309,29 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Sets the topic znode with the given assignment.
+   * @param topic the topic whose assignment is being set.
+   * @param assignment the partition to replica mapping to set for the given topic
+   * @throws KeeperException if there is an error while setting assignment
+   */
+  def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
+    val setDataResponse = setTopicAssignmentRaw(topic, assignment)
+    if (setDataResponse.resultCode != Code.OK) {
+      setDataResponse.resultException.foreach(e => throw e)
+    }
+  }
+
+  /**
+   * Create the topic znode with the given assignment.
+   * @param topic the topic whose assignment is being set.
+   * @param assignment the partition to replica mapping to set for the given topic
+   * @throws KeeperException if there is an error while creating assignment
+   */
+  def createTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
+    createRecursive(TopicZNode.path(topic), TopicZNode.encode(assignment))
+  }
+
+  /**
    * Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path.
    * @return sequence of znode names and not the absolute znode path.
    */
@@ -271,7 +371,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     if (getChildrenResponse.resultCode == Code.OK) {
       deleteLogDirEventNotifications(getChildrenResponse.children)
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      throw getChildrenResponse.resultException.get
+      getChildrenResponse.resultException.foreach(e => throw e)
     }
   }
 
@@ -305,6 +405,40 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Gets partition the assignments for the given topics.
+   * @param topics the topics whose partitions we wish to get the assignments for.
+   * @return the partition assignment for each partition from the given topics.
+   */
+  def getPartitionAssignmentForTopics(topics: Set[String]):  Map[String, Map[Int, Seq[Int]]] = {
+    val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
+    val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
+    getDataResponses.flatMap { getDataResponse =>
+      val topic = getDataResponse.ctx.get.asInstanceOf[String]
+       if (getDataResponse.resultCode == Code.OK) {
+        val partitionMap = TopicZNode.decode(topic, getDataResponse.data).map { case (k, v) => (k.partition, v) }
+        Map(topic -> partitionMap)
+      } else if (getDataResponse.resultCode == Code.NONODE) {
+        Map.empty[String, Map[Int, Seq[Int]]]
+      } else {
+        throw getDataResponse.resultException.get
+      }
+    }.toMap
+  }
+
+  /**
+   * Gets the partition numbers for the given topics
+   * @param topics the topics whose partitions we wish to get.
+   * @return the partition array for each topic from the given topics.
+   */
+  def getPartitionsForTopics(topics: Set[String]): Map[String, Seq[Int]] = {
+    getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
+      val topic = topicAndPartitionMap._1
+      val partitionMap = topicAndPartitionMap._2
+      topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
+    }
+  }
+
+  /**
    * Gets the partition count for a given topic
    * @param topic The topic to get partition count for.
    * @return  optional integer that is Some if the topic exists and None otherwise.
@@ -318,6 +452,16 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Gets the assigned replicas for a specific topic and partition
+   * @param topicPartition TopicAndPartition to get assigned replicas for .
+   * @return List of assigned replicas
+   */
+  def getReplicasForPartition(topicPartition: TopicPartition): Seq[Int] = {
+    val topicData = getReplicaAssignmentForTopics(Set(topicPartition.topic))
+    topicData.getOrElse(topicPartition, Seq.empty)
+  }
+
+  /**
    * Gets the data and version at the given zk path
    * @param path zk node path
    * @return A tuple of 2 elements, where first element is zk node data as string
@@ -413,6 +557,25 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Creates the delete topic znode.
+   * @param topicName topic name
+   * @throws KeeperException if there is an error while setting or creating the znode
+   */
+  def createDeleteTopicPath(topicName: String): Unit = {
+    createRecursive(DeleteTopicsTopicZNode.path(topicName))
+  }
+
+
+  /**
+   * Checks if topic is marked for deletion
+   * @param topic
+   * @return true if topic is marked for deletion, else false
+   */
+  def isTopicMarkedForDeletion(topic: String): Boolean = {
+    pathExists(DeleteTopicsTopicZNode.path(topic))
+  }
+
+  /**
    * Get all topics marked for deletion.
    * @return sequence of topics marked for deletion.
    */
@@ -479,6 +642,21 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Creates the partition reassignment znode with the given reassignment.
+   * @param reassignment the reassignment to set on the reassignment znode.
+   * @throws KeeperException if there is an error while setting or creating the znode
+   */
+  def createPartitionReassignment(reassignment: Map[TopicPartition, Seq[Int]])  = {
+    val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
+      acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
+    val createResponse = retryRequestUntilConnected(createRequest)
+
+    if (createResponse.resultCode != Code.OK) {
+      throw createResponse.resultException.get
+    }
+  }
+
+  /**
    * Deletes the partition reassignment znode.
    */
   def deletePartitionReassignment(): Unit = {
@@ -487,6 +665,22 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Checks if reassign partitions is in progress
+   * @return true if reassign partitions is in progress, else false
+   */
+  def reassignPartitionsInProgress(): Boolean = {
+    pathExists(ReassignPartitionsZNode.path)
+  }
+
+  /**
+   * Gets the partitions being reassigned for given topics
+   * @return ReassignedPartitionsContexts for each topic which are being reassigned.
+   */
+  def getPartitionsBeingReassigned(): Map[TopicPartition, ReassignedPartitionsContext] = {
+    getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+  }
+
+  /**
    * Gets topic partition states for the given partitions.
    * @param partitions the partitions for which we want to get states.
    * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
@@ -504,6 +698,35 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Gets topic partition state for the given partition.
+   * @param partition the partition for which we want to get state.
+   * @return LeaderIsrAndControllerEpoch of the partition state if exists, else None
+   */
+  def getTopicPartitionState(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
+    val getDataResponse = getTopicPartitionStatesRaw(Seq(partition)).head
+    if (getDataResponse.resultCode == Code.OK) {
+      TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
+    } else if (getDataResponse.resultCode == Code.NONODE) {
+      None
+    } else {
+      throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Gets the leader for a given partition
+   * @param partition
+   * @return optional integer if the leader exists and None otherwise.
+   */
+  def getLeaderForPartition(partition: TopicPartition): Option[Int] = {
+    val leaderIsrEpoch = getTopicPartitionState(partition)
+    if (leaderIsrEpoch.isDefined)
+      Option(leaderIsrEpoch.get.leaderAndIsr.leader)
+    else
+      None
+  }
+
+  /**
    * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
    * @return sequence of znode names and not the absolute znode path.
    */
@@ -543,7 +766,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     if (getChildrenResponse.resultCode == Code.OK) {
       deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber))
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      throw getChildrenResponse.resultException.get
+      getChildrenResponse.resultException.foreach(e => throw e)
     }
   }
 
@@ -641,9 +864,9 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * Creates the required zk nodes for Acl storage
    */
   def createAclPaths(): Unit = {
-    createRecursive(AclZNode.path)
-    createRecursive(AclChangeNotificationZNode.path)
-    ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name)))
+    createRecursive(AclZNode.path, throwIfPathExists = false)
+    createRecursive(AclChangeNotificationZNode.path, throwIfPathExists = false)
+    ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name), throwIfPathExists = false))
   }
 
   /**
@@ -719,7 +942,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     if (getChildrenResponse.resultCode == Code.OK) {
       deleteAclChangeNotifications(getChildrenResponse.children)
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      throw getChildrenResponse.resultException.get
+      getChildrenResponse.resultException.foreach(e => throw e)
     }
   }
 
@@ -735,7 +958,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     val deleteResponses = retryRequestsUntilConnected(deleteRequests)
     deleteResponses.foreach { deleteResponse =>
       if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
-        throw deleteResponse.resultException.get
+        deleteResponse.resultException.foreach(e => throw e)
       }
     }
   }
@@ -790,7 +1013,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
       case _ => throw deleteResponse.resultException.get
     }
   }
-  
+
   /**
    * Deletes the zk node recursively
    * @param path
@@ -889,7 +1112,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     }
   }
 
-   /**
+  /**
    * Set the committed offset for a topic partition and group
    * @param group the group whose offset is being set
    * @param topicPartition the topic partition whose offset is being set
@@ -898,8 +1121,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   def setOrCreateConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): Unit = {
     val setDataResponse = setConsumerOffset(group, topicPartition, offset)
     if (setDataResponse.resultCode == Code.NONODE) {
-      val createResponse = createConsumerOffset(group, topicPartition, offset)
-      createResponse.resultException.foreach(e => throw e)
+      createConsumerOffset(group, topicPartition, offset)
     } else {
       setDataResponse.resultException.foreach(e => throw e)
     }
@@ -911,17 +1133,9 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     retryRequestUntilConnected(setDataRequest)
   }
 
-  private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): CreateResponse = {
+  private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long) = {
     val path = ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition)
-    val createRequest = CreateRequest(path, ConsumerOffset.encode(offset), acls(path), CreateMode.PERSISTENT)
-    var createResponse = retryRequestUntilConnected(createRequest)
-    if (createResponse.resultCode == Code.NONODE) {
-      val indexOfLastSlash = path.lastIndexOf("/")
-      if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
-      createRecursive(path.substring(0, indexOfLastSlash))
-      createResponse = retryRequestUntilConnected(createRequest)
-    }
-    createResponse
+    createRecursive(path, ConsumerOffset.encode(offset))
   }
 
   /**
@@ -955,21 +1169,40 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     }
   }
 
-  private[zk] def createRecursive(path: String): Unit = {
-    val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
-    var createResponse = retryRequestUntilConnected(createRequest)
-    if (createResponse.resultCode == Code.NONODE) {
+  private[zk] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
+
+    def parentPath(path: String): String = {
       val indexOfLastSlash = path.lastIndexOf("/")
       if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
-      val parentPath = path.substring(0, indexOfLastSlash)
-      createRecursive(parentPath)
-      createResponse = retryRequestUntilConnected(createRequest)
-      if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
+      path.substring(0, indexOfLastSlash)
+    }
+
+    def createRecursive0(path: String): Unit = {
+      val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
+      var createResponse = retryRequestUntilConnected(createRequest)
+      if (createResponse.resultCode == Code.NONODE) {
+        createRecursive0(parentPath(path))
+        createResponse = retryRequestUntilConnected(createRequest)
+        if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
+          throw createResponse.resultException.get
+        }
+      } else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
         throw createResponse.resultException.get
       }
-    } else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
-      throw createResponse.resultException.get
     }
+
+    val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT)
+    var createResponse = retryRequestUntilConnected(createRequest)
+
+    if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
+      createResponse.resultException.foreach(e => throw e)
+    } else if (createResponse.resultCode == Code.NONODE) {
+      createRecursive0(parentPath(path))
+      createResponse = retryRequestUntilConnected(createRequest)
+      createResponse.resultException.foreach(e => throw e)
+    } else if (createResponse.resultCode != Code.NODEEXISTS)
+      createResponse.resultException.foreach(e => throw e)
+
   }
 
   private def createTopicPartition(partitions: Seq[TopicPartition]): Seq[CreateResponse] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 4c618a0..a0085cd 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -28,8 +28,6 @@ import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.data.Stat
 
-import scala.collection.Seq
-
 // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
 
 object ControllerZNode {
@@ -140,16 +138,29 @@ object ConfigEntityZNode {
     import scala.collection.JavaConverters._
     Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala))
   }
-  def decode(bytes: Array[Byte]): Option[Properties] = {
-    Json.parseBytes(bytes).map { js =>
-      val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
-      val props = new Properties()
-      configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
-      props
+  def decode(bytes: Array[Byte]): Properties = {
+    val props = new Properties()
+    if (bytes != null) {
+      Json.parseBytes(bytes).map { js =>
+        val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
+        configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
+      }
     }
+    props
   }
 }
 
+object ConfigEntityChangeNotificationZNode {
+  def path = s"${ConfigZNode.path}/changes"
+}
+
+object ConfigEntityChangeNotificationSequenceZNode {
+  val SequenceNumberPrefix = "config_change_"
+  def createPath = s"${ConfigEntityChangeNotificationZNode.path}/$SequenceNumberPrefix"
+  def encode(sanitizedEntityPath : String): Array[Byte] = Json.encodeAsBytes(Map("version" -> 2, "entity_path" -> sanitizedEntityPath))
+  def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+}
+
 object IsrChangeNotificationZNode {
   def path = "/isr_change_notification"
 }


[2/3] kafka git commit: KAFKA-5646; Use KafkaZkClient in DynamicConfigManager and AdminManager

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 47d487e..3f51528 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -32,7 +32,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAw
       "--replication-factor", replicationFactor.toString,
       "--disable-rack-aware",
       "--topic", "foo"))
-    kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
+    kafka.admin.TopicCommand.createTopic(zkClient, createOpts)
 
     val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
     val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 522fcd3..b701f13 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,7 +19,6 @@ import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
 import kafka.admin.AdminClient
-import kafka.admin.AdminUtils
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
 import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
 import kafka.common.TopicAndPartition
@@ -447,9 +446,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
    */
   @Test
   def testAuthorizationWithTopicNotExisting() {
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    AdminUtils.deleteTopic(zkUtils, deleteTopic)
+    adminZkClient.deleteTopic(deleteTopic)
     TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
 
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index eadb488..0badda9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -22,7 +22,6 @@ import java.util.Properties
 import java.util.concurrent.TimeUnit
 
 import collection.JavaConverters._
-import kafka.admin.AdminUtils
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
@@ -375,7 +374,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
       case (topicPartition, replicas) => topicPartition.partition -> replicas
     }
-    AdminUtils.addPartitions(zkUtils, topic, existingAssignment, AdminUtils.getBrokerMetadatas(zkUtils), 2)
+    adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), 2)
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index 383f139..3e08327 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -16,7 +16,6 @@ package kafka.api
 
 import java.util.Properties
 
-import kafka.admin.AdminUtils
 import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Sanitizer
@@ -54,6 +53,6 @@ class ClientIdQuotaTest extends BaseQuotaTest {
   }
 
   private def updateQuotaOverride(clientId: String, properties: Properties) {
-    AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(clientId), properties)
+    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(clientId), properties)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index cb5262d..0ae9d17 100644
--- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -18,7 +18,7 @@ package kafka.api
 
 import java.util.Properties
 
-import kafka.admin.{RackAwareMode, AdminUtils, RackAwareTest}
+import kafka.admin.{RackAwareMode, RackAwareTest}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
@@ -55,7 +55,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa
       val assignment = zkUtils.getReplicaAssignmentForTopics(Seq(topic)).map { case (topicPartition, replicas) =>
         topicPartition.partition -> replicas
       }
-      val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
+      val brokerMetadatas = adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced)
       val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
       assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap)
       checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index e25f886..453ac91 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -17,7 +17,6 @@ package kafka.api
 import java.io.File
 import java.util.Properties
 
-import kafka.admin.AdminUtils
 import kafka.server._
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Sanitizer
@@ -41,7 +40,7 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
     val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
+    adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
     waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
@@ -59,11 +58,11 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
 
   override def removeQuotaOverrides() {
     val emptyProps = new Properties
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
+    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
+    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
   }
 
   private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) {
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
+    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index b5d88c0..91a92fa 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -17,7 +17,6 @@ package kafka.api
 import java.io.File
 import java.util.Properties
 
-import kafka.admin.AdminUtils
 import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -45,7 +44,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
     this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
     super.setUp()
     val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default, defaultProps)
+    adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default, defaultProps)
     waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
   }
 
@@ -67,6 +66,6 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
   }
 
   private def updateQuotaOverride(properties: Properties) {
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal), properties)
+    adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal), properties)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 075b1af..10c7737 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.server.ReplicaFetcherThread.{FetchRequest, PartitionData}
 import kafka.utils.{Exit, TestUtils, ZkUtils}
@@ -60,7 +59,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
     // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
     // the metadata is propagated.
     def createTopic(zkUtils: ZkUtils, topic: String): Unit = {
-      AdminUtils.createTopic(zkUtils, topic, partitions = 1, replicationFactor = 2)
+      adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 79fc68f..d79ba32 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -73,7 +73,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testWrongReplicaCount(): Unit = {
     try {
-      AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+      adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 2,
         Some(Map(0 -> Seq(0, 1), 1 -> Seq(0, 1, 2))))
       fail("Add partitions should fail")
     } catch {
@@ -84,7 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testMissingPartition0(): Unit = {
     try {
-      AdminUtils.addPartitions(zkUtils, topic5, topic5Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+      adminZkClient.addPartitions(topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
         Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2))))
       fail("Add partitions should fail")
     } catch {
@@ -95,7 +95,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testIncrementPartitions(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
+    adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 3)
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
     val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
@@ -123,7 +123,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testManualAssignmentOfReplicas(): Unit = {
     // Add 2 partitions
-    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3,
+    adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3,
       Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
@@ -152,7 +152,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testReplicaPlacementAllServers(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic3, topic3Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 7)
+    adminZkClient.addPartitions(topic3, topic3Assignment, adminZkClient.getBrokerMetadatas(), 7)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
@@ -179,7 +179,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testReplicaPlacementPartialServers(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
+    adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 87ce46e..acac907 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -21,8 +21,8 @@ import java.util.Properties
 import kafka.admin.ConfigCommand.ConfigCommandOptions
 import kafka.common.InvalidConfigException
 import kafka.server.ConfigEntityName
-import kafka.utils.{Logging, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.Logging
+import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.common.security.scram.ScramCredentialUtils
 import org.apache.kafka.common.utils.Sanitizer
 import org.easymock.EasyMock
@@ -95,7 +95,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   def shouldFailIfUnrecognisedEntityType(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test
@@ -106,14 +106,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
-    val configChange = new TestAdminUtils {
-      override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configChange: Properties): Unit = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def changeClientIdConfig(clientId: String, configChange: Properties): Unit = {
         assertEquals("my-client-id", clientId)
         assertEquals("b", configChange.get("a"))
         assertEquals("d", configChange.get("c"))
       }
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test
@@ -124,14 +125,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
-    val configChange = new TestAdminUtils {
-      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties): Unit = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
         assertEquals("my-topic", topic)
         assertEquals("b", configChange.get("a"))
         assertEquals("d", configChange.get("c"))
       }
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test
@@ -142,14 +144,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=d"))
 
-    val configChange = new TestAdminUtils {
-      override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
         assertEquals(Seq(1), brokerIds)
         assertEquals("b", configChange.get("a"))
         assertEquals("d", configChange.get("c"))
       }
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test
@@ -160,15 +163,18 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--add-config", "a=b,c=[d,e ,f],g=[h,i]"))
 
-    val configChange = new TestAdminUtils {
-      override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
         assertEquals(Seq(1), brokerIds)
         assertEquals("b", configChange.get("a"))
         assertEquals("d,e ,f", configChange.get("c"))
         assertEquals("h,i", configChange.get("g"))
       }
+
+      override def changeTopicConfig(topic: String, configs: Properties): Unit = {}
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test (expected = classOf[IllegalArgumentException])
@@ -178,7 +184,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-type", "brokers",
       "--alter",
       "--add-config", "a=b"))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test (expected = classOf[IllegalArgumentException])
@@ -188,7 +194,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-type", "brokers",
       "--alter",
       "--add-config", "a="))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test (expected = classOf[IllegalArgumentException])
@@ -198,7 +204,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-type", "brokers",
       "--alter",
       "--add-config", "a=[b,c,d=e"))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test (expected = classOf[InvalidConfigException])
@@ -208,7 +214,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--entity-type", "topics",
       "--alter",
       "--delete-config", "missing_config1, missing_config2"))
-    ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+    ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
   }
 
   @Test
@@ -219,8 +225,8 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
       "--alter",
       "--delete-config", "a,c"))
 
-    val configChange = new TestAdminUtils {
-      override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+    case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
         val properties: Properties = new Properties
         properties.put("a", "b")
         properties.put("c", "d")
@@ -228,12 +234,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         properties
       }
 
-      override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+      override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
         assertEquals("f", configChange.get("e"))
         assertEquals(1, configChange.size())
       }
     }
-    ConfigCommand.alterConfig(null, createOpts, configChange)
+
+    ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
   }
 
   @Test
@@ -253,11 +260,11 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         "--delete-config", mechanism))
 
     val credentials = mutable.Map[String, Properties]()
-    case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends TestAdminUtils {
-      override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+    case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends AdminZkClient(zkClient) {
+      override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
         credentials.getOrElse(entityName, new Properties())
       }
-      override def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configChange: Properties): Unit = {
+      override def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configChange: Properties): Unit = {
         assertEquals(user, sanitizedEntityName)
         assertEquals(mechanisms, configChange.keySet().asScala)
         for (mechanism <- mechanisms) {
@@ -397,17 +404,17 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def testQuotaDescribeEntities() {
-    val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+    val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
 
     def checkEntities(opts: Array[String], expectedFetches: Map[String, Seq[String]], expectedEntityNames: Seq[String]) {
       val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ "--describe"))
       expectedFetches.foreach {
-        case (name, values) => EasyMock.expect(zkUtils.getAllEntitiesWithConfig(name)).andReturn(values)
+        case (name, values) => EasyMock.expect(zkClient.getAllEntitiesWithConfig(name)).andReturn(values)
       }
-      EasyMock.replay(zkUtils)
-      val entities = entity.getAllEntities(zkUtils)
+      EasyMock.replay(zkClient)
+      val entities = entity.getAllEntities(zkClient)
       assertEquals(expectedEntityNames, entities.map(e => e.fullSanitizedName))
-      EasyMock.reset(zkUtils)
+      EasyMock.reset(zkClient)
     }
 
     val clientId = "a-client"
@@ -456,4 +463,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
         Map("users" -> Seq("<default>", sanitizedPrincipal)) ++ defaultUserMap ++ userMap,
         Seq("<default>/clients/client-3", sanitizedPrincipal + "/clients/client-2"))
   }
+
+  case class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+    override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): Unit = {}
+    override def fetchEntityConfig(entityType: String, entityName: String): Properties = {new Properties}
+    override def changeClientIdConfig(clientId: String, configs: Properties): Unit = {}
+    override def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties): Unit = {}
+    override def changeTopicConfig(topic: String, configs: Properties): Unit = {}
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 78f022a..1922aaf 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -46,7 +46,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = "test"
     servers = createTestTopicAndCluster(topic)
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
   }
 
@@ -61,7 +61,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
     follower.shutdown()
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     // check if all replicas but the one that is shut down has deleted the log
     TestUtils.waitUntilTrue(() =>
       servers.filter(s => s.config.brokerId != follower.config.brokerId)
@@ -86,7 +86,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     follower.shutdown()
 
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     // shut down the controller to trigger controller failover during delete topic
     controller.shutdown()
 
@@ -112,7 +112,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     this.servers = allServers
     val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created.")
@@ -121,7 +121,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
     follower.shutdown()
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
     // the topic is being deleted
     // reassign partition 0
@@ -155,16 +155,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val follower = servers.filter(_.config.brokerId != leaderIdOpt.get).last
     val newPartition = new TopicPartition(topic, 1)
     // capture the brokers before we shutdown so that we don't fail validation in `addPartitions`
-    val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
+    val brokers = adminZkClient.getBrokerMetadatas()
     follower.shutdown()
     // wait until the broker has been removed from ZK to reduce non-determinism
     TestUtils.waitUntilTrue(() => zkUtils.getBrokerInfo(follower.config.brokerId).isEmpty,
       s"Follower ${follower.config.brokerId} was not removed from ZK")
     // add partitions to topic
-    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
+    adminZkClient.addPartitions(topic, expectedReplicaAssignment, brokers, 2,
       Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     follower.startup()
     // test if topic deletion is resumed
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -178,12 +178,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   def testAddPartitionDuringDeleteTopic() {
     val topic = "test"
     servers = createTestTopicAndCluster(topic)
-    val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
+    val brokers = adminZkClient.getBrokerMetadatas()
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     // add partitions to topic
     val newPartition = new TopicPartition(topic, 1)
-    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
+    adminZkClient.addPartitions(topic, expectedReplicaAssignment, brokers, 2,
       Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
@@ -198,10 +198,10 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topicPartition = new TopicPartition(topic, 0)
     servers = createTestTopicAndCluster(topic)
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // re-create topic on same replicas
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // wait until leader is elected
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
     // check if all replica logs are created
@@ -216,7 +216,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     servers = createTestTopicAndCluster(topic)
     // start topic deletion
     try {
-      AdminUtils.deleteTopic(zkUtils, "test2")
+      adminZkClient.deleteTopic("test2")
       fail("Expected UnknownTopicOrPartitionException")
     } catch {
       case _: UnknownTopicOrPartitionException => // expected exception
@@ -258,7 +258,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
    server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
 
     // delete topic
-    AdminUtils.deleteTopic(zkUtils, "test")
+    adminZkClient.deleteTopic("test")
     TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers)
   }
 
@@ -270,9 +270,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
     try {
       // start topic deletion
-      AdminUtils.deleteTopic(zkUtils, topic)
+      adminZkClient.deleteTopic(topic)
       // try to delete topic marked as deleted
-      AdminUtils.deleteTopic(zkUtils, topic)
+      adminZkClient.deleteTopic(topic)
       fail("Expected TopicAlreadyMarkedForDeletionException")
     }
     catch {
@@ -293,7 +293,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created")
@@ -316,7 +316,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val topic = topicPartition.topic
     servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
     // mark the topic for deletion
-    AdminUtils.deleteTopic(zkUtils, "test")
+    adminZkClient.deleteTopic("test")
     TestUtils.waitUntilTrue(() => !zkUtils.isTopicMarkedForDeletion(topic),
       "Admin path /admin/delete_topic/%s path not deleted even if deleteTopic is disabled".format(topic))
     // verify that topic test is untouched

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 7000308..e367372 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -56,7 +56,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
   }
 
   @After
@@ -234,7 +234,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   def testDescribeWithMultiPartitionTopicAndMultipleConsumersWithNewConsumer() {
     TestUtils.createOffsetsTopic(zkUtils, servers)
     val topic2 = "foo2"
-    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+    adminZkClient.createTopic(topic2, 2, 1)
 
     // run two consumers in the group consuming from a two-partition topic
     consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 6727fad..4b22898 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -30,7 +30,6 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 
-
 class ListConsumerGroupTest extends KafkaServerTestHarness {
 
   val overridingProps = new Properties()
@@ -47,7 +46,7 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
   override def setUp() {
     super.setUp()
 
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
     props.setProperty("group.id", group)
     props.setProperty("zookeeper.connect", zkConnect)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 6853b16..8227487 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -88,7 +88,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
 
@@ -98,13 +98,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 1 (specific offset).")
 
     printConsumerGroup("new.group")
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
     consumerGroupCommand.close()
   }
 
   @Test
   def testResetOffsetsToLocalDateTime() {
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
     val calendar = Calendar.getInstance()
@@ -144,12 +144,12 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
 
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
   def testResetOffsetsToZonedDateTime() {
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
     TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
 
     val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
@@ -188,7 +188,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
 
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -230,7 +230,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -241,7 +241,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
 
     printConsumerGroup()
 
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -250,7 +250,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -260,7 +260,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 100 (latest by duration).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -269,7 +269,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -279,7 +279,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -288,7 +288,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -301,7 +301,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 200 (latest).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -310,7 +310,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -322,7 +322,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 100 (current).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) {
@@ -351,7 +351,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -362,7 +362,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 1 (specific offset).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -371,7 +371,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -383,7 +383,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 150 (current + 50).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -392,7 +392,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -405,7 +405,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 50 (current - 50).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -414,7 +414,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -426,7 +426,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest by shift).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -435,7 +435,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -447,7 +447,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 200 (latest by shift).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -456,7 +456,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
 
@@ -466,7 +466,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -475,7 +475,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+    adminZkClient.createTopic(topic1, 2, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
 
@@ -485,7 +485,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   @Test
@@ -498,8 +498,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 1, 1)
-    AdminUtils.createTopic(zkUtils, topic2, 1, 1)
+    adminZkClient.createTopic(topic1, 1, 1)
+    adminZkClient.createTopic(topic2, 1, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
     produceConsumeAndShutdown(consumerGroupCommand, 1, topic2, 100)
@@ -511,8 +511,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest).")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
-    AdminUtils.deleteTopic(zkUtils, topic2)
+    adminZkClient.deleteTopic(topic1)
+    adminZkClient.deleteTopic(topic2)
   }
 
   @Test
@@ -525,8 +525,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 2, 1)
-    AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+    adminZkClient.createTopic(topic1, 2, 1)
+    adminZkClient.createTopic(topic2, 2, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
     produceConsumeAndShutdown(consumerGroupCommand, 2, topic2, 100)
@@ -538,8 +538,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     }, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
-    AdminUtils.deleteTopic(zkUtils, topic2)
+    adminZkClient.deleteTopic(topic1)
+    adminZkClient.deleteTopic(topic2)
   }
 
   @Test
@@ -548,7 +548,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     val opts = new ConsumerGroupCommandOptions(cgcArgs)
     val consumerGroupCommand = createConsumerGroupService(opts)
 
-    AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+    adminZkClient.createTopic(topic1, 2, 1)
 
     produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
 
@@ -575,7 +575,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
     file.deleteOnExit()
 
     printConsumerGroup()
-    AdminUtils.deleteTopic(zkUtils, topic1)
+    adminZkClient.deleteTopic(topic1)
   }
 
   private def printConsumerGroup() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 53efa34..180f257 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -44,8 +44,8 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
       "--replication-factor", "1",
       "--config", cleanupKey + "=" + cleanupVal,
       "--topic", topic))
-    TopicCommand.createTopic(zkUtils, createOpts)
-    val props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+    TopicCommand.createTopic(zkClient, createOpts)
+    val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
     assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
     assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
 
@@ -55,8 +55,8 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // modify the topic to add new partitions
     val numPartitionsModified = 3
     val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic))
-    TopicCommand.alterTopic(zkUtils, alterOpts)
-    val newProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+    TopicCommand.alterTopic(zkClient, alterOpts)
+    val newProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
     assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
     assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
   }
@@ -75,27 +75,27 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
       "--topic", normalTopic))
-    TopicCommand.createTopic(zkUtils, createOpts)
+    TopicCommand.createTopic(zkClient, createOpts)
 
     // delete the NormalTopic
     val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
     val deletePath = getDeleteTopicPath(normalTopic)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deletePath))
-    TopicCommand.deleteTopic(zkUtils, deleteOpts)
+    TopicCommand.deleteTopic(zkClient, deleteOpts)
     assertTrue("Delete path for topic should exist after deletion.", zkUtils.pathExists(deletePath))
 
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
       "--topic", Topic.GROUP_METADATA_TOPIC_NAME))
-    TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
+    TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
 
     // try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
     val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
     val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
-      TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
+      TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
     }
     assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
   }
@@ -109,12 +109,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // delete a topic that does not exist without --if-exists
     val deleteOpts = new TopicCommandOptions(Array("--topic", "test"))
     intercept[IllegalArgumentException] {
-      TopicCommand.deleteTopic(zkUtils, deleteOpts)
+      TopicCommand.deleteTopic(zkClient, deleteOpts)
     }
 
     // delete a topic that does not exist with --if-exists
     val deleteExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--if-exists"))
-    TopicCommand.deleteTopic(zkUtils, deleteExistsOpts)
+    TopicCommand.deleteTopic(zkClient, deleteExistsOpts)
   }
 
   @Test
@@ -126,12 +126,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // alter a topic that does not exist without --if-exists
     val alterOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions", "1"))
     intercept[IllegalArgumentException] {
-      TopicCommand.alterTopic(zkUtils, alterOpts)
+      TopicCommand.alterTopic(zkClient, alterOpts)
     }
 
     // alter a topic that does not exist with --if-exists
     val alterExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions", "1", "--if-exists"))
-    TopicCommand.alterTopic(zkUtils, alterExistsOpts)
+    TopicCommand.alterTopic(zkClient, alterExistsOpts)
   }
 
   @Test
@@ -146,17 +146,17 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // create the topic
     val createOpts = new TopicCommandOptions(
       Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic))
-    TopicCommand.createTopic(zkUtils, createOpts)
+    TopicCommand.createTopic(zkClient, createOpts)
 
     // try to re-create the topic without --if-not-exists
     intercept[TopicExistsException] {
-      TopicCommand.createTopic(zkUtils, createOpts)
+      TopicCommand.createTopic(zkClient, createOpts)
     }
 
     // try to re-create the topic with --if-not-exists
     val createNotExistsOpts = new TopicCommandOptions(
       Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic, "--if-not-exists"))
-    TopicCommand.createTopic(zkUtils, createNotExistsOpts)
+    TopicCommand.createTopic(zkClient, createNotExistsOpts)
   }
 
   @Test
@@ -170,7 +170,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
       "--partitions", numPartitions.toString,
       "--replication-factor", replicationFactor.toString,
       "--topic", "foo"))
-    TopicCommand.createTopic(zkUtils, createOpts)
+    TopicCommand.createTopic(zkClient, createOpts)
 
     var assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
       tp.partition -> replicas
@@ -182,7 +182,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     val alterOpts = new TopicCommandOptions(Array(
       "--partitions", alteredNumPartitions.toString,
       "--topic", "foo"))
-    TopicCommand.alterTopic(zkUtils, alterOpts)
+    TopicCommand.alterTopic(zkClient, alterOpts)
     assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
       tp.partition -> replicas
     }
@@ -198,28 +198,28 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     TestUtils.createBrokersInZk(zkUtils, brokers)
 
     val createOpts = new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", topic))
-    TopicCommand.createTopic(zkUtils, createOpts)
+    TopicCommand.createTopic(zkClient, createOpts)
 
     // delete the broker first, so when we attempt to delete the topic it gets into "marked for deletion"
     TestUtils.deleteBrokersInZk(zkUtils, brokers)
-    TopicCommand.deleteTopic(zkUtils, new TopicCommandOptions(Array("--topic", topic)))
+    TopicCommand.deleteTopic(zkClient, new TopicCommandOptions(Array("--topic", topic)))
 
     // Test describe topics
     def describeTopicsWithConfig() {
-      TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe")))
+      TopicCommand.describeTopic(zkClient, new TopicCommandOptions(Array("--describe")))
     }
     val outputWithConfig = TestUtils.grabConsoleOutput(describeTopicsWithConfig)
     assertTrue(outputWithConfig.contains(topic) && outputWithConfig.contains(markedForDeletionDescribe))
 
     def describeTopicsNoConfig() {
-      TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe", "--unavailable-partitions")))
+      TopicCommand.describeTopic(zkClient, new TopicCommandOptions(Array("--describe", "--unavailable-partitions")))
     }
     val outputNoConfig = TestUtils.grabConsoleOutput(describeTopicsNoConfig)
     assertTrue(outputNoConfig.contains(topic) && outputNoConfig.contains(markedForDeletionDescribe))
 
     // Test list topics
     def listTopics() {
-      TopicCommand.listTopics(zkUtils, new TopicCommandOptions(Array("--list")))
+      TopicCommand.listTopics(zkClient, new TopicCommandOptions(Array("--list")))
     }
     val output = TestUtils.grabConsoleOutput(listTopics)
     assertTrue(output.contains(topic) && output.contains(markedForDeletionList))

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 04f9bea..32e23cc 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -20,7 +20,6 @@ package kafka.controller
 import java.util.Properties
 import java.util.concurrent.CountDownLatch
 
-import kafka.admin.AdminUtils
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils._
@@ -60,7 +59,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
     }
     val initialEpoch = initialController.epoch
     // Create topic with one partition
-    AdminUtils.createTopic(servers.head.zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
     val topicPartition = new TopicPartition("topic1", 0)
     TestUtils.waitUntilTrue(() =>
       initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index bec5026..daa4276 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -26,7 +26,6 @@ import org.junit.{Before, Test}
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Gauge
 
-
 class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with Logging {
 
   private val nodesNum = 3
@@ -136,7 +135,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
       // Create topics
       for (t <- topics if running) {
         try {
-          kafka.admin.AdminUtils.createTopic(zkUtils, t, partitionNum, replicationFactor)
+          adminZkClient.createTopic(t, partitionNum, replicationFactor)
         } catch {
           case e: Exception => e.printStackTrace
         }
@@ -146,7 +145,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
       // Delete topics
       for (t <- topics if running) {
           try {
-              kafka.admin.AdminUtils.deleteTopic(zkUtils, t)
+              adminZkClient.deleteTopic(t)
           } catch {
           case e: Exception => e.printStackTrace
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 6e9e8ff..e93dcf6 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.integration
 
-import kafka.admin.AdminUtils
 import kafka.api.TopicMetadataResponse
 import kafka.client.ClientUtils
 import kafka.cluster.BrokerEndPoint
@@ -219,7 +218,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
 
     // create topic
     val topic: String = "test"
-    AdminUtils.createTopic(zkUtils, topic, 1, numBrokers)
+    adminZkClient.createTopic(topic, 1, numBrokers)
 
     // shutdown a broker
     adHocServers.last.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 24421d0..7732e38 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -25,7 +25,6 @@ import org.apache.log4j.{Level, Logger}
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 
-import kafka.admin.AdminUtils
 import kafka.consumer.{Consumer, ConsumerConfig}
 import kafka.serializer.StringDecoder
 import kafka.server.{KafkaConfig, KafkaServer}
@@ -109,7 +108,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
 
     verifyUncleanLeaderElectionEnabled
   }
@@ -121,7 +120,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
 
     verifyUncleanLeaderElectionDisabled
   }
@@ -136,7 +135,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled
     val topicProps = new Properties()
     topicProps.put("unclean.leader.election.enable", "true")
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
       topicProps)
 
     verifyUncleanLeaderElectionEnabled
@@ -153,7 +152,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled
     val topicProps = new Properties()
     topicProps.put("unclean.leader.election.enable", "false")
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
       topicProps)
 
     verifyUncleanLeaderElectionDisabled
@@ -168,7 +167,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     topicProps.put("unclean.leader.election.enable", "invalid")
 
     intercept[ConfigException] {
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1)), topicProps)
+      adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1)), topicProps)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 0dcff53..c250d85 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -28,7 +28,6 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import kafka.serializer._
 import kafka.utils._
-import kafka.admin.AdminUtils
 import kafka.utils.TestUtils._
 
 import scala.collection._
@@ -73,8 +72,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   def testMetricsReporterAfterDeletingTopic() {
     val topic = "test-topic-metric"
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.createTopic(topic, 1, 1)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
   }
@@ -82,13 +81,13 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
     val topic = "test-broker-topic-metric"
-    AdminUtils.createTopic(zkUtils, topic, 2, 1)
+    adminZkClient.createTopic(topic, 2, 1)
     // Produce a few messages to create the metrics
     // Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
     TestUtils.produceMessages(servers, topic, nMessages)
     assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
     servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index fa1174d..a0680a2 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -125,7 +125,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     props.put("request.required.acks", "0")
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    AdminUtils.createTopic(zkUtils, "test", 1, 1)
+    adminZkClient.createTopic("test", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "test", 0)
 
     // This message will be dropped silently since message size too large.
@@ -167,9 +167,9 @@ class SyncProducerTest extends KafkaServerTestHarness {
     }
 
     // #2 - test that we get correct offsets when partition is owned by broker
-    AdminUtils.createTopic(zkUtils, "topic1", 1, 1)
+    adminZkClient.createTopic("topic1", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic1", 0)
-    AdminUtils.createTopic(zkUtils, "topic3", 1, 1)
+    adminZkClient.createTopic("topic3", 1, 1)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic3", 0)
 
     val response2 = producer.send(request)
@@ -243,7 +243,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val topicProps = new Properties()
     topicProps.put("min.insync.replicas","2")
-    AdminUtils.createTopic(zkUtils, topicName, 1, 1,topicProps)
+    adminZkClient.createTopic(topicName, 1, 1,topicProps)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topicName, 0)
 
     val response = producer.send(produceRequest(topicName, 0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index dd7bb5f..85bd6a1 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -26,7 +26,7 @@ import org.easymock.EasyMock
 import org.junit.Test
 import kafka.integration.KafkaServerTestHarness
 import kafka.utils._
-import kafka.admin.{AdminOperationException, AdminUtils}
+import kafka.admin.AdminOperationException
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.Map
@@ -43,14 +43,14 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     val tp = new TopicPartition("test", 0)
     val logProps = new Properties()
     logProps.put(FlushMessagesProp, oldVal.toString)
-    AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)
+    adminZkClient.createTopic(tp.topic, 1, 1, logProps)
     TestUtils.retry(10000) {
       val logOpt = this.servers.head.logManager.getLog(tp)
       assertTrue(logOpt.isDefined)
       assertEquals(oldVal, logOpt.get.config.flushInterval)
     }
     logProps.put(FlushMessagesProp, newVal.toString)
-    AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps)
+    adminZkClient.changeTopicConfig(tp.topic, logProps)
     TestUtils.retry(10000) {
       assertEquals(newVal, this.servers.head.logManager.getLog(tp).get.config.flushInterval)
     }
@@ -65,8 +65,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
 
     val quotaManagers = servers.head.apis.quotas
     rootEntityType match {
-      case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, configEntityName, props)
-      case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, configEntityName, props)
+      case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, props)
+      case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, props)
     }
 
     TestUtils.retry(10000) {
@@ -84,8 +84,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
 
     val emptyProps = new Properties()
     rootEntityType match {
-      case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, configEntityName, emptyProps)
-      case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, configEntityName, emptyProps)
+      case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, emptyProps)
+      case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, emptyProps)
     }
     TestUtils.retry(10000) {
       val producerQuota = quotaManagers.produce.quota(user, clientId)
@@ -142,9 +142,9 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     userClientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "100000")
     userClientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "200000")
 
-    AdminUtils.changeClientIdConfig(zkUtils, "overriddenClientId", clientIdProps)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "overriddenUser", userProps)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
+    adminZkClient.changeClientIdConfig("overriddenClientId", clientIdProps)
+    adminZkClient.changeUserOrUserClientIdConfig("overriddenUser", userProps)
+    adminZkClient.changeUserOrUserClientIdConfig("ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
 
     // Remove config change znodes to force quota initialization only through loading of user/client quotas
     zkUtils.getChildren(ZkUtils.ConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.ConfigChangesPath + "/" + p) }
@@ -165,7 +165,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     try {
       val logProps = new Properties()
       logProps.put(FlushMessagesProp, 10000: java.lang.Integer)
-      AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
+      adminZkClient.changeTopicConfig(topic, logProps)
       fail("Should fail with AdminOperationException for topic doesn't exist")
     } catch {
       case _: AdminOperationException => // expected
@@ -187,7 +187,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     EasyMock.expectLastCall().once()
     EasyMock.replay(handler)
 
-    val configManager = new DynamicConfigManager(zkUtils, zkClient, Map(ConfigType.Topic -> handler))
+    val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
     // Notifications created using the old TopicConfigManager are ignored.
     configManager.ConfigChangedNotificationHandler.processNotification("not json")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index 9e6b1b2..b2378cf 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -16,50 +16,39 @@
   */
 package kafka.server
 
-import kafka.admin.AdminUtils
-import kafka.utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.common.config._
-import org.easymock.EasyMock
-import org.junit.{Before, Test}
 import kafka.utils.CoreUtils._
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.config._
+import org.junit.Test
 
-class DynamicConfigTest {
+class DynamicConfigTest  extends ZooKeeperTestHarness {
   private final val nonExistentConfig: String = "some.config.that.does.not.exist"
   private final val someValue: String = "some interesting value"
 
-  var zkUtils: ZkUtils = _
-
-  @Before
-  def setUp() {
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
-  }
-
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingBrokerUnknownConfig() {
-    AdminUtils.changeBrokerConfig(zkUtils, Seq(0), propsWith(nonExistentConfig, someValue))
+    adminZkClient.changeBrokerConfig(Seq(0), propsWith(nonExistentConfig, someValue))
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingClientIdUnknownConfig() {
-    AdminUtils.changeClientIdConfig(zkUtils, "ClientId", propsWith(nonExistentConfig, someValue))
+    adminZkClient.changeClientIdConfig("ClientId", propsWith(nonExistentConfig, someValue))
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingUserUnknownConfig() {
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "UserId", propsWith(nonExistentConfig, someValue))
+    adminZkClient.changeUserOrUserClientIdConfig("UserId", propsWith(nonExistentConfig, someValue))
   }
 
   @Test(expected = classOf[ConfigException])
   def shouldFailLeaderConfigsWithInvalidValues() {
-    AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+    adminZkClient.changeBrokerConfig(Seq(0),
       propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "-100"))
   }
 
   @Test(expected = classOf[ConfigException])
   def shouldFailFollowerConfigsWithInvalidValues() {
-    AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+    adminZkClient.changeBrokerConfig(Seq(0),
       propsWith(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "-100"))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cabbd5d..a51acd0 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -90,7 +90,6 @@ class KafkaApisTest {
       groupCoordinator,
       txnCoordinator,
       controller,
-      zkUtils,
       zkClient,
       brokerId,
       new KafkaConfig(properties),

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 306dbc0..b0a7d72 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -21,7 +21,6 @@ import java.io.File
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Properties, Random}
 
-import kafka.admin.AdminUtils
 import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.common.TopicAndPartition
 import kafka.consumer.SimpleConsumer
@@ -81,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in ZooKeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
 
     val logManager = server.getLogManager
     waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
@@ -116,7 +115,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in ZooKeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    adminZkClient.createTopic(topic, 1, 1)
 
     val logManager = server.getLogManager
     waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
@@ -179,7 +178,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in ZooKeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkUtils, topic, 3, 1)
+    adminZkClient.createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)
@@ -208,7 +207,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in ZooKeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkUtils, topic, 3, 1)
+    adminZkClient.createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index bb7e9fe..61e17e3 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -30,8 +30,6 @@ import org.junit.Assert._
 import java.util.Properties
 import java.io.File
 
-import kafka.admin.AdminUtils
-
 import scala.util.Random
 import scala.collection._
 
@@ -319,7 +317,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     assertEquals(Errors.NONE, commitResponse.commitStatus.get(topicPartition).get)
 
     // start topic deletion
-    AdminUtils.deleteTopic(zkUtils, topic)
+    adminZkClient.deleteTopic(topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, Seq(server))
     Thread.sleep(retentionCheckInterval * 2)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index f162492..45a6bdd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -19,8 +19,6 @@ package kafka.server
 
 import java.util.Properties
 
-import kafka.admin.AdminUtils
-import kafka.admin.AdminUtils._
 import kafka.log.LogConfig._
 import kafka.server.KafkaConfig.fromProps
 import kafka.server.QuotaType._
@@ -80,7 +78,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on node 6,7 (not started yet)
     //And two extra partitions 6,7, which we don't intend on throttling.
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
       0 -> Seq(100, 106), //Throttled
       1 -> Seq(101, 106), //Throttled
       2 -> Seq(102, 106), //Throttled
@@ -99,7 +97,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Set the throttle limit on all 8 brokers, but only assign throttled replicas to the six leaders, or two followers
     (100 to 107).foreach { brokerId =>
-      changeBrokerConfig(zkUtils, Seq(brokerId),
+      adminZkClient.changeBrokerConfig(Seq(brokerId),
         propsWith(
           (DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString),
           (DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
@@ -108,9 +106,9 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Either throttle the six leaders or the two followers
     if (leaderThrottle)
-      changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100,1:101,2:102,3:103,4:104,5:105" ))
+      adminZkClient.changeTopicConfig(topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100,1:101,2:102,3:103,4:104,5:105" ))
     else
-      changeTopicConfig(zkUtils, topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
+      adminZkClient.changeTopicConfig(topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
 
     //Add data equally to each partition
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
@@ -178,7 +176,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val config: Properties = createBrokerConfig(100, zkConnect)
     config.put("log.segment.bytes", (1024 * 1024).toString)
     brokers = Seq(createServer(fromProps(config)))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
 
     //Write 20MBs and throttle at 5MB/s
     val msg = msg100KB
@@ -187,8 +185,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val throttle: Long = msg.length * msgCount / expectedDuration
 
     //Set the throttle to only limit leader
-    changeBrokerConfig(zkUtils, Seq(100), propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString))
-    changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100"))
+    adminZkClient.changeBrokerConfig(Seq(100), propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString))
+    adminZkClient.changeTopicConfig(topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100"))
 
     //Add data
     addData(msgCount, msg)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 4774e1d..ee75933 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -19,7 +19,6 @@ import java.nio.ByteBuffer
 import java.util.{Collections, LinkedHashMap, Properties}
 import java.util.concurrent.{Executors, Future, TimeUnit}
 
-import kafka.admin.AdminUtils
 import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
@@ -81,9 +80,9 @@ class RequestQuotaTest extends BaseRequestTest {
     // Change default client-id request quota to a small value and a single unthrottledClient with a large quota
     val quotaProps = new Properties()
     quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
-    AdminUtils.changeClientIdConfig(zkUtils, "<default>", quotaProps)
+    adminZkClient.changeClientIdConfig("<default>", quotaProps)
     quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000")
-    AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(unthrottledClientId), quotaProps)
+    adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
 
     TestUtils.retry(10000) {
       val quotaManager = servers.head.apis.quotas.request