You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/11/22 21:25:59 UTC

[3/3] kafka git commit: KAFKA-5646; Use KafkaZkClient in DynamicConfigManager and AdminManager

KAFKA-5646; Use KafkaZkClient in DynamicConfigManager and AdminManager

* Add AdminZkClient class
* Use KafkaZkClient, AdminZkClient  in ConfigCommand, TopicCommand
* All the existing tests should work

Author: Manikumar Reddy <ma...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #4194 from omkreddy/KAFKA-5646-ZK-ADMIN-UTILS-DYNAMIC-MANAGER


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc852baf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc852baf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc852baf

Branch: refs/heads/trunk
Commit: bc852baffbf602ead9cb719a01747de414940d53
Parents: c5f31fe
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Wed Nov 22 13:25:52 2017 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 22 13:25:52 2017 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  15 +
 .../main/scala/kafka/admin/ConfigCommand.scala  |  37 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  86 ++--
 .../main/scala/kafka/server/AdminManager.scala  |  32 +-
 .../kafka/server/DynamicConfigManager.scala     |  16 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  10 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   6 +-
 .../src/main/scala/kafka/zk/AdminZkClient.scala | 417 +++++++++++++++++++
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 339 ++++++++++++---
 core/src/main/scala/kafka/zk/ZkData.scala       |  27 +-
 .../ReassignPartitionsIntegrationTest.scala     |   2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |   5 +-
 .../kafka/api/BaseProducerSendTest.scala        |   3 +-
 .../kafka/api/ClientIdQuotaTest.scala           |   3 +-
 .../api/RackAwareAutoTopicCreationTest.scala    |   4 +-
 .../kafka/api/UserClientIdQuotaTest.scala       |   9 +-
 .../integration/kafka/api/UserQuotaTest.scala   |   5 +-
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   3 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  12 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    |  78 ++--
 .../unit/kafka/admin/DeleteTopicTest.scala      |  38 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala |   4 +-
 .../kafka/admin/ListConsumerGroupTest.scala     |   3 +-
 .../admin/ResetConsumerGroupOffsetTest.scala    |  80 ++--
 .../unit/kafka/admin/TopicCommandTest.scala     |  44 +-
 .../controller/ControllerFailoverTest.scala     |   3 +-
 ...MetricsDuringTopicCreationDeletionTest.scala |   5 +-
 .../kafka/integration/TopicMetadataTest.scala   |   3 +-
 .../integration/UncleanLeaderElectionTest.scala |  11 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala  |   9 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |   8 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  24 +-
 .../unit/kafka/server/DynamicConfigTest.scala   |  29 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |   1 -
 .../scala/unit/kafka/server/LogOffsetTest.scala |   9 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |   4 +-
 .../kafka/server/ReplicationQuotasTest.scala    |  16 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |   5 +-
 ...rivenReplicationProtocolAcceptanceTest.scala |   9 +-
 .../epoch/LeaderEpochIntegrationTest.scala      |   7 +-
 .../scala/unit/kafka/zk/AdminZkClientTest.scala | 323 ++++++++++++++
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 101 ++++-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |   3 +
 43 files changed, 1459 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 32cab2a..09a65af 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -32,6 +32,7 @@ import scala.collection.JavaConverters._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.kafka.common.internals.Topic
 
+@deprecated("This class is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
 trait AdminUtilities {
   def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
   def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
@@ -267,6 +268,7 @@ object AdminUtils extends Logging with AdminUtilities {
   * @param validateOnly If true, validate the parameters without actually adding the partitions
   * @return the updated replica assignment
   */
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def addPartitions(zkUtils: ZkUtils,
                     topic: String,
                     existingAssignment: Map[Int, Seq[Int]],
@@ -359,6 +361,7 @@ object AdminUtils extends Logging with AdminUtilities {
     }
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def deleteTopic(zkUtils: ZkUtils, topic: String) {
       if (topicExists(zkUtils, topic)) {
         try {
@@ -434,6 +437,7 @@ object AdminUtils extends Logging with AdminUtilities {
   def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
     zkUtils.pathExists(getTopicPath(topic))
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
                          brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
     val allBrokers = zkUtils.getAllBrokersInCluster()
@@ -452,6 +456,7 @@ object AdminUtils extends Logging with AdminUtilities {
     brokerMetadatas.sortBy(_.id)
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def createTopic(zkUtils: ZkUtils,
                   topic: String,
                   partitions: Int,
@@ -463,6 +468,7 @@ object AdminUtils extends Logging with AdminUtilities {
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def validateCreateOrUpdateTopic(zkUtils: ZkUtils,
                                   topic: String,
                                   partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -501,6 +507,7 @@ object AdminUtils extends Logging with AdminUtilities {
       LogConfig.validate(config)
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
                                                      topic: String,
                                                      partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -548,6 +555,7 @@ object AdminUtils extends Logging with AdminUtilities {
    *                 existing configs need to be deleted, it should be done prior to invoking this API
    *
    */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) {
     DynamicConfig.Client.validate(configs)
     changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs)
@@ -564,6 +572,7 @@ object AdminUtils extends Logging with AdminUtilities {
    *                 existing configs need to be deleted, it should be done prior to invoking this API
    *
    */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
     if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
       DynamicConfig.Client.validate(configs)
@@ -589,6 +598,7 @@ object AdminUtils extends Logging with AdminUtilities {
    *                 existing configs need to be deleted, it should be done prior to invoking this API
    *
    */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
     validateTopicConfig(zkUtils, topic, configs)
     changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
@@ -602,6 +612,7 @@ object AdminUtils extends Logging with AdminUtilities {
     * @param brokers: The list of brokers to apply config changes to
     * @param configs: The config to change, as properties
     */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit = {
     DynamicConfig.Broker.validate(configs)
     brokers.foreach { broker =>
@@ -637,6 +648,7 @@ object AdminUtils extends Logging with AdminUtilities {
    * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
    * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
    */
+   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = {
     val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName)
     // readDataMaybeNull returns Some(null) if the path exists, but there is no data
@@ -657,12 +669,15 @@ object AdminUtils extends Logging with AdminUtilities {
     props
   }
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
     zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
     zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
 
+  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
   def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
     def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
       val root = rootPath match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index febf40f..077ecce 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -24,8 +24,10 @@ import kafka.common.Config
 import kafka.common.InvalidConfigException
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils.{CommandLineUtils, ZkUtils}
+import kafka.utils.CommandLineUtils
 import kafka.utils.Implicits._
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
 import org.apache.kafka.common.utils.{Sanitizer, Utils}
@@ -61,26 +63,25 @@ object ConfigCommand extends Config {
 
     opts.checkArgs()
 
-    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
-                          30000,
-                          30000,
-                          JaasUtils.isZkSecurityEnabled())
+    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue)
+    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+    val adminZkClient = new AdminZkClient(zkClient)
 
     try {
       if (opts.options.has(opts.alterOpt))
-        alterConfig(zkUtils, opts)
+        alterConfig(zkClient, opts, adminZkClient)
       else if (opts.options.has(opts.describeOpt))
-        describeConfig(zkUtils, opts)
+        describeConfig(zkClient, opts, adminZkClient)
     } catch {
       case e: Throwable =>
         println("Error while executing config command " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
-      zkUtils.close()
+      zkClient.close()
     }
   }
 
-  private[admin] def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: AdminUtilities = AdminUtils) {
+  private[admin] def alterConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
     val configsToBeAdded = parseConfigsToBeAdded(opts)
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
     val entity = parseEntity(opts)
@@ -91,7 +92,7 @@ object ConfigCommand extends Config {
       preProcessScramCredentials(configsToBeAdded)
 
     // compile the final set of configs
-    val configs = utils.fetchEntityConfig(zkUtils, entityType, entityName)
+    val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
 
     // fail the command if any of the configs to be deleted does not exist
     val invalidConfigs = configsToBeDeleted.filterNot(configs.containsKey(_))
@@ -101,7 +102,7 @@ object ConfigCommand extends Config {
     configs ++= configsToBeAdded
     configsToBeDeleted.foreach(configs.remove(_))
 
-    utils.changeConfigs(zkUtils, entityType, entityName, configs)
+    adminZkClient.changeConfigs(entityType, entityName, configs)
 
     println(s"Completed Updating config for entity: $entity.")
   }
@@ -127,12 +128,12 @@ object ConfigCommand extends Config {
     }
   }
 
-  private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
+  private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
     val configEntity = parseEntity(opts)
     val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
-    val entities = configEntity.getAllEntities(zkUtils)
+    val entities = configEntity.getAllEntities(zkClient)
     for (entity <- entities) {
-      val configs = AdminUtils.fetchEntityConfig(zkUtils, entity.root.entityType, entity.fullSanitizedName)
+      val configs = adminZkClient.fetchEntityConfig(entity.root.entityType, entity.fullSanitizedName)
       // When describing all users, don't include empty user nodes with only <user, client> quota overrides.
       if (!configs.isEmpty || !describeAllUsers) {
         println("Configs for %s are %s"
@@ -196,7 +197,7 @@ object ConfigCommand extends Config {
   case class ConfigEntity(root: Entity, child: Option[Entity]) {
     val fullSanitizedName = root.sanitizedName.getOrElse("") + child.map(s => "/" + s.entityPath).getOrElse("")
 
-    def getAllEntities(zkUtils: ZkUtils) : Seq[ConfigEntity] = {
+    def getAllEntities(zkClient: KafkaZkClient) : Seq[ConfigEntity] = {
       // Describe option examples:
       //   Describe entity with specified name:
       //     --entity-type topics --entity-name topic1 (topic1)
@@ -211,19 +212,19 @@ object ConfigCommand extends Config {
       //     --entity-type users --entity-default --entity-type clients --entity-default (Default <user, client>)
       (root.sanitizedName, child) match {
         case (None, _) =>
-          val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
+          val rootEntities = zkClient.getAllEntitiesWithConfig(root.entityType)
                                    .map(name => ConfigEntity(Entity(root.entityType, Some(name)), child))
           child match {
             case Some(s) =>
                 rootEntities.flatMap(rootEntity =>
-                  ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils))
+                  ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkClient))
             case None => rootEntities
           }
         case (_, Some(childEntity)) =>
           childEntity.sanitizedName match {
             case Some(_) => Seq(this)
             case None =>
-                zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
+                zkClient.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
                        .map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name)))))
 
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f2a74a0..bdd8aaf 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -25,18 +25,19 @@ import kafka.utils.Implicits._
 import kafka.consumer.Whitelist
 import kafka.log.LogConfig
 import kafka.server.ConfigType
-import kafka.utils.ZkUtils._
 import kafka.utils._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
+import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConverters._
 import scala.collection._
 
-
 object TopicCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
@@ -53,36 +54,35 @@ object TopicCommand extends Logging {
 
     opts.checkArgs()
 
-    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
-                          30000,
-                          30000,
-                          JaasUtils.isZkSecurityEnabled())
+    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue)
+    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+
     var exitCode = 0
     try {
       if(opts.options.has(opts.createOpt))
-        createTopic(zkUtils, opts)
+        createTopic(zkClient, opts)
       else if(opts.options.has(opts.alterOpt))
-        alterTopic(zkUtils, opts)
+        alterTopic(zkClient, opts)
       else if(opts.options.has(opts.listOpt))
-        listTopics(zkUtils, opts)
+        listTopics(zkClient, opts)
       else if(opts.options.has(opts.describeOpt))
-        describeTopic(zkUtils, opts)
+        describeTopic(zkClient, opts)
       else if(opts.options.has(opts.deleteOpt))
-        deleteTopic(zkUtils, opts)
+        deleteTopic(zkClient, opts)
     } catch {
       case e: Throwable =>
         println("Error while executing topic command : " + e.getMessage)
         error(Utils.stackTrace(e))
         exitCode = 1
     } finally {
-      zkUtils.close()
+      zkClient.close()
       Exit.exit(exitCode)
     }
 
   }
 
-  private def getTopics(zkUtils: ZkUtils, opts: TopicCommandOptions): Seq[String] = {
-    val allTopics = zkUtils.getAllTopics().sorted
+  private def getTopics(zkClient: KafkaZkClient, opts: TopicCommandOptions): Seq[String] = {
+    val allTopics = zkClient.getAllTopicsInCluster.sorted
     if (opts.options.has(opts.topicOpt)) {
       val topicsSpec = opts.options.valueOf(opts.topicOpt)
       val topicsFilter = new Whitelist(topicsSpec)
@@ -91,23 +91,24 @@ object TopicCommand extends Logging {
       allTopics
   }
 
-  def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+  def createTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
     val topic = opts.options.valueOf(opts.topicOpt)
     val configs = parseTopicConfigsToBeAdded(opts)
     val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
     if (Topic.hasCollisionChars(topic))
       println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
+    val adminZkClient = new AdminZkClient(zkClient)
     try {
       if (opts.options.has(opts.replicaAssignmentOpt)) {
         val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-        AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
+        adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignment, configs, update = false)
       } else {
         CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
         val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
         val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
                             else RackAwareMode.Enforced
-        AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
+        adminZkClient.createTopic(topic, partitions, replicas, configs, rackAwareMode)
       }
       println("Created topic \"%s\".".format(topic))
     } catch  {
@@ -115,15 +116,16 @@ object TopicCommand extends Logging {
     }
   }
 
-  def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
-    val topics = getTopics(zkUtils, opts)
+  def alterTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+    val topics = getTopics(zkClient, opts)
     val ifExists = opts.options.has(opts.ifExistsOpt)
     if (topics.isEmpty && !ifExists) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
           opts.options.valueOf(opts.zkConnectOpt)))
     }
+    val adminZkClient = new AdminZkClient(zkClient)
     topics.foreach { topic =>
-      val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+      val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
       if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
         println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
         println("         Going forward, please use kafka-configs.sh for this functionality")
@@ -133,7 +135,7 @@ object TopicCommand extends Logging {
         // compile the final set of configs
         configs ++= configsToBeAdded
         configsToBeDeleted.foreach(config => configs.remove(config))
-        AdminUtils.changeTopicConfig(zkUtils, topic, configs)
+        adminZkClient.changeTopicConfig(topic, configs)
         println("Updated config for topic \"%s\".".format(topic))
       }
 
@@ -144,7 +146,7 @@ object TopicCommand extends Logging {
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +
           "logic or ordering of the messages will be affected")
         val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
-        val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+        val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map {
           case (topicPartition, replicas) => topicPartition.partition -> replicas
         }
         if (existingAssignment.isEmpty)
@@ -155,17 +157,17 @@ object TopicCommand extends Logging {
           val partitionList = replicaAssignmentString.split(",").drop(startPartitionId)
           AdminUtils.parseReplicaAssignment(partitionList.mkString(","), startPartitionId)
         }
-        val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
-        AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers, nPartitions, newAssignment)
+        val allBrokers = adminZkClient.getBrokerMetadatas()
+        adminZkClient.addPartitions(topic, existingAssignment, allBrokers, nPartitions, newAssignment)
         println("Adding partitions succeeded!")
       }
     }
   }
 
-  def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
-    val topics = getTopics(zkUtils, opts)
+  def listTopics(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+    val topics = getTopics(zkClient, opts)
     for(topic <- topics) {
-      if (zkUtils.isTopicMarkedForDeletion(topic)) {
+      if (zkClient.isTopicMarkedForDeletion(topic)) {
         println("%s - marked for deletion".format(topic))
       } else {
         println(topic)
@@ -173,8 +175,8 @@ object TopicCommand extends Logging {
     }
   }
 
-  def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
-    val topics = getTopics(zkUtils, opts)
+  def deleteTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+    val topics = getTopics(zkClient, opts)
     val ifExists = opts.options.has(opts.ifExistsOpt)
     if (topics.isEmpty && !ifExists) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
@@ -185,12 +187,12 @@ object TopicCommand extends Logging {
         if (Topic.isInternal(topic)) {
           throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
         } else {
-          zkUtils.createPersistentPath(getDeleteTopicPath(topic))
+          zkClient.createDeleteTopicPath(topic)
           println("Topic %s is marked for deletion.".format(topic))
           println("Note: This will have no impact if delete.topic.enable is not set to true.")
         }
       } catch {
-        case _: ZkNodeExistsException =>
+        case _: NodeExistsException =>
           println("Topic %s is already marked for deletion.".format(topic))
         case e: AdminOperationException =>
           throw e
@@ -200,21 +202,23 @@ object TopicCommand extends Logging {
     }
   }
 
-  def describeTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
-    val topics = getTopics(zkUtils, opts)
+  def describeTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+    val topics = getTopics(zkClient, opts)
     val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt)
     val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt)
     val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt)
-    val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet
+    val liveBrokers = zkClient.getAllBrokersInCluster.map(_.id).toSet
+    val adminZkClient = new AdminZkClient(zkClient)
+
     for (topic <- topics) {
-      zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic) match {
+       zkClient.getPartitionAssignmentForTopics(immutable.Set(topic)).get(topic) match {
         case Some(topicPartitionAssignment) =>
           val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
           val describePartitions: Boolean = !reportOverriddenConfigs
           val sortedPartitions = topicPartitionAssignment.toSeq.sortBy(_._1)
-          val markedForDeletion = zkUtils.isTopicMarkedForDeletion(topic)
+          val markedForDeletion = zkClient.isTopicMarkedForDeletion(topic)
           if (describeConfigs) {
-            val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
+            val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic).asScala
             if (!reportOverriddenConfigs || configs.nonEmpty) {
               val numPartitions = topicPartitionAssignment.size
               val replicationFactor = topicPartitionAssignment.head._2.size
@@ -226,8 +230,10 @@ object TopicCommand extends Logging {
           }
           if (describePartitions) {
             for ((partitionId, assignedReplicas) <- sortedPartitions) {
-              val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
-              val leader = zkUtils.getLeaderForPartition(topic, partitionId)
+              val leaderIsrEpoch = zkClient.getTopicPartitionState(new TopicPartition(topic, partitionId))
+              val inSyncReplicas = if (leaderIsrEpoch.isEmpty) Seq.empty[Int] else leaderIsrEpoch.get.leaderAndIsr.isr
+              val leader = if (leaderIsrEpoch.isEmpty) None else Option(leaderIsrEpoch.get.leaderAndIsr.leader)
+
               if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
                   (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
                   (reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 935fade..8f69000 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -23,6 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
 import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
@@ -41,11 +42,12 @@ import scala.collection.JavaConverters._
 class AdminManager(val config: KafkaConfig,
                    val metrics: Metrics,
                    val metadataCache: MetadataCache,
-                   val zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
+                   val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
 
   this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
 
   private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
+  private val adminZkClient = new AdminZkClient(zkClient)
 
   private val createTopicPolicy =
     Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
@@ -101,7 +103,7 @@ class AdminManager(val config: KafkaConfig,
 
         createTopicPolicy match {
           case Some(policy) =>
-            AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false)
+            adminZkClient.validateCreateOrUpdateTopic(topic, assignments, configs, update = false)
 
             // Use `null` for unset fields in the public API
             val numPartitions: java.lang.Integer =
@@ -114,13 +116,13 @@ class AdminManager(val config: KafkaConfig,
               arguments.configs))
 
             if (!validateOnly)
-              AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
+              adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignments, configs, update = false)
 
           case None =>
             if (validateOnly)
-              AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false)
+              adminZkClient.validateCreateOrUpdateTopic(topic, assignments, configs, update = false)
             else
-              AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
+              adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignments, configs, update = false)
         }
         CreatePartitionsMetadata(topic, assignments, ApiError.NONE)
       } catch {
@@ -165,7 +167,7 @@ class AdminManager(val config: KafkaConfig,
     // 1. map over topics calling the asynchronous delete
     val metadata = topics.map { topic =>
         try {
-          AdminUtils.deleteTopic(zkUtils, topic)
+          adminZkClient.deleteTopic(topic)
           DeleteTopicMetadata(topic, Errors.NONE)
         } catch {
           case _: TopicAlreadyMarkedForDeletionException =>
@@ -203,8 +205,8 @@ class AdminManager(val config: KafkaConfig,
                        listenerName: ListenerName,
                        callback: Map[String, ApiError] => Unit): Unit = {
 
-    val reassignPartitionsInProgress = zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)
-    val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
+    val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
+    val allBrokers = adminZkClient.getBrokerMetadatas()
     val allBrokerIds = allBrokers.map(_.id)
 
     // 1. map over topics creating assignment and calling AdminUtils
@@ -215,7 +217,7 @@ class AdminManager(val config: KafkaConfig,
         if (reassignPartitionsInProgress)
           throw new ReassignmentInProgressException("A partition reassignment is in progress.")
 
-        val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+        val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map {
           case (topicPartition, replicas) => topicPartition.partition -> replicas
         }
         if (existingAssignment.isEmpty)
@@ -247,7 +249,7 @@ class AdminManager(val config: KafkaConfig,
           }.toMap
         }
 
-        val updatedReplicaAssignment = AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers,
+        val updatedReplicaAssignment = adminZkClient.addPartitions(topic, existingAssignment, allBrokers,
           newPartition.totalCount, reassignment, validateOnly = validateOnly)
         CreatePartitionsMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
       } catch {
@@ -306,7 +308,7 @@ class AdminManager(val config: KafkaConfig,
             val topic = resource.name
             Topic.validate(topic)
             // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
-            val topicProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+            val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
             val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
             createResponseConfig(logConfig, isReadOnly = false, name => !topicProps.containsKey(name))
 
@@ -350,19 +352,19 @@ class AdminManager(val config: KafkaConfig,
 
             alterConfigPolicy match {
               case Some(policy) =>
-                AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+                adminZkClient.validateTopicConfig(topic, properties)
 
                 val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
                 policy.validate(new AlterConfigPolicy.RequestMetadata(
                   new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava))
 
                 if (!validateOnly)
-                  AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+                  adminZkClient.changeTopicConfig(topic, properties)
               case None =>
                 if (validateOnly)
-                  AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+                  adminZkClient.validateTopicConfig(topic, properties)
                 else
-                  AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+                  adminZkClient.changeTopicConfig(topic, properties)
             }
             resource -> ApiError.NONE
           case resourceType =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 6392723..457742d 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -26,7 +26,7 @@ import scala.collection._
 import scala.collection.JavaConverters._
 import kafka.admin.AdminUtils
 import kafka.utils.json.JsonObject
-import kafka.zk.KafkaZkClient
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.security.scram.ScramMechanism
 import org.apache.kafka.common.utils.Time
@@ -84,11 +84,11 @@ object ConfigEntityName {
  * on startup where a change might be missed between the initial config load and registering for change notifications.
  *
  */
-class DynamicConfigManager(private val oldZkUtils: ZkUtils,
-                           private val zkClient: KafkaZkClient,
+class DynamicConfigManager(private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, ConfigHandler],
                            private val changeExpirationMs: Long = 15*60*1000,
                            private val time: Time = Time.SYSTEM) extends Logging {
+  val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(json: String) = {
@@ -120,7 +120,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
         throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
       }
 
-      val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, entityType, entity)
+      val entityConfig = adminZkClient.fetchEntityConfig(entityType, entity)
       info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
       configHandlers(entityType).processConfigChanges(entity, entityConfig)
 
@@ -141,7 +141,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
       }
       val fullSanitizedEntityName = entityPath.substring(index + 1)
 
-      val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, rootEntityType, fullSanitizedEntityName)
+      val entityConfig = adminZkClient.fetchEntityConfig(rootEntityType, fullSanitizedEntityName)
       val loggableConfig = entityConfig.asScala.map {
         case (k, v) => (k, if (ScramMechanism.isScram(k)) Password.HIDDEN else v)
       }
@@ -163,14 +163,14 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
     // Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides
     configHandlers.foreach {
       case (ConfigType.User, handler) =>
-        AdminUtils.fetchAllEntityConfigs(oldZkUtils, ConfigType.User).foreach {
+        adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach {
           case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties)
         }
-        AdminUtils.fetchAllChildEntityConfigs(oldZkUtils, ConfigType.User, ConfigType.Client).foreach {
+        adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach {
           case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties)
         }
       case (configType, handler) =>
-        AdminUtils.fetchAllEntityConfigs(oldZkUtils, configType).foreach {
+        adminZkClient.fetchAllEntityConfigs(configType).foreach {
           case (entityName, properties) => handler.processConfigChanges(entityName, properties)
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index de56986..a31b6c3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -29,7 +29,7 @@ import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.cluster.Partition
 import kafka.common.{OffsetAndMetadata, OffsetMetadata}
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.controller.{KafkaController}
+import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
@@ -37,8 +37,8 @@ import kafka.network.RequestChannel
 import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction}
 import kafka.security.SecurityUtils
 import kafka.security.auth.{Resource, _}
-import kafka.utils.{CoreUtils, Logging, ZkUtils}
-import kafka.zk.KafkaZkClient
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
@@ -71,7 +71,6 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val groupCoordinator: GroupCoordinator,
                 val txnCoordinator: TransactionCoordinator,
                 val controller: KafkaController,
-                val zkUtils: ZkUtils,
                 val zkClient: KafkaZkClient,
                 val brokerId: Int,
                 val config: KafkaConfig,
@@ -84,6 +83,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 time: Time) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
+  val adminZkClient = new AdminZkClient(zkClient)
 
   def close() {
     info("Shutdown complete.")
@@ -829,7 +829,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                           replicationFactor: Int,
                           properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
     try {
-      AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
+      adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
       info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
         .format(topic, numPartitions, replicationFactor))
       new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1812eb0..7f61479 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -242,7 +242,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         kafkaController = new KafkaController(config, zkClient, time, metrics, threadNamePrefix)
         kafkaController.startup()
 
-        adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
+        adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
 
         /* start group coordinator */
         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
@@ -263,7 +263,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
-          kafkaController, zkUtils, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+          kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
           brokerTopicStats, clusterId, time)
 
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
@@ -278,7 +278,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
                                                            ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
         // Create the config manager. start listening to notifications
-        dynamicConfigManager = new DynamicConfigManager(zkUtils, zkClient, dynamicConfigHandlers)
+        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
         /* tell everyone we are alive */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/AdminZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
new file mode 100644
index 0000000..e00b8e6
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import java.util.Properties
+
+import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode}
+import kafka.common.TopicAlreadyMarkedForDeletionException
+import kafka.log.LogConfig
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
+import org.apache.zookeeper.KeeperException.NodeExistsException
+
+import scala.collection.{Map, Seq}
+
+class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Creates the topic with given configuration
+   * @param topic topic name to create
+   * @param partitions  Number of partitions to be set
+   * @param replicationFactor Replication factor
+   * @param topicConfig  topic configs
+   * @param rackAwareMode
+   */
+  def createTopic(topic: String,
+                  partitions: Int,
+                  replicationFactor: Int,
+                  topicConfig: Properties = new Properties,
+                  rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
+    val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
+    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
+    createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment, topicConfig)
+  }
+
+  /**
+   * Gets broker metadata list
+   * @param rackAwareMode
+   * @param brokerList
+   * @return
+   */
+  def getBrokerMetadatas(rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
+                         brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
+    val allBrokers = zkClient.getAllBrokersInCluster
+    val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
+    val brokersWithRack = brokers.filter(_.rack.nonEmpty)
+    if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
+      throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
+        " to make replica assignment without rack information.")
+    }
+    val brokerMetadatas = rackAwareMode match {
+      case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
+      case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
+        brokers.map(broker => BrokerMetadata(broker.id, None))
+      case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
+    }
+    brokerMetadatas.sortBy(_.id)
+  }
+
+  /**
+   * Creates or Updates the partition assignment for a given topic
+   * @param topic
+   * @param partitionReplicaAssignment
+   * @param config
+   * @param update
+   */
+  def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String,
+                                                     partitionReplicaAssignment: Map[Int, Seq[Int]],
+                                                     config: Properties = new Properties,
+                                                     update: Boolean = false) {
+    validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)
+
+    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+    if (!update) {
+      // write out the config if there is any, this isn't transactional with the partition assignments
+      zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
+    }
+
+    // create the partition assignment
+    writeTopicPartitionAssignment(topic, partitionReplicaAssignment, update)
+  }
+
+  /**
+   * Validate method to use before the topic creation or update
+   * @param topic
+   * @param partitionReplicaAssignment
+   * @param config
+   * @param update
+   */
+  def validateCreateOrUpdateTopic(topic: String,
+                                  partitionReplicaAssignment: Map[Int, Seq[Int]],
+                                  config: Properties,
+                                  update: Boolean): Unit = {
+    // validate arguments
+    Topic.validate(topic)
+
+    if (!update) {
+      if (zkClient.topicExists(topic))
+        throw new TopicExistsException(s"Topic '$topic' already exists.")
+      else if (Topic.hasCollisionChars(topic)) {
+        val allTopics = zkClient.getAllTopicsInCluster
+        // check again in case the topic was created in the meantime, otherwise the
+        // topic could potentially collide with itself
+        if (allTopics.contains(topic))
+          throw new TopicExistsException(s"Topic '$topic' already exists.")
+        val collidingTopics = allTopics.filter(Topic.hasCollision(topic, _))
+        if (collidingTopics.nonEmpty) {
+          throw new InvalidTopicException(s"Topic '$topic' collides with existing topics: ${collidingTopics.mkString(", ")}")
+        }
+      }
+    }
+
+    if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1)
+      throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas")
+
+    partitionReplicaAssignment.values.foreach(reps =>
+      if (reps.size != reps.toSet.size)
+        throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment)
+    )
+
+    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+    if (!update)
+      LogConfig.validate(config)
+  }
+
+  private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
+    try {
+      val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
+
+      if (!update) {
+        info("Topic creation " + assignment)
+        zkClient.createTopicAssignment(topic, assignment)
+      } else {
+        info("Topic update " + assignment)
+        zkClient.setTopicAssignment(topic, assignment)
+      }
+      debug("Updated path %s with %s for replica assignment".format(TopicZNode.path(topic), assignment))
+    } catch {
+      case _: NodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
+      case e2: Throwable => throw new AdminOperationException(e2.toString)
+    }
+  }
+
+
+  /**
+   * Creates a delete path for a given topic
+   * @param topic
+   */
+  def deleteTopic(topic: String) {
+    if (zkClient.topicExists(topic)) {
+      try {
+        zkClient.createDeleteTopicPath(topic)
+      } catch {
+        case _: NodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+          "topic %s is already marked for deletion".format(topic))
+        case e: Throwable => throw new AdminOperationException(e.getMessage)
+       }
+    } else {
+      throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
+    }
+  }
+
+  /**
+  * Add partitions to existing topic with optional replica assignment
+  *
+  * @param topic Topic for adding partitions to
+  * @param existingAssignment A map from partition id to its assigned replicas
+  * @param allBrokers All brokers in the cluster
+  * @param numPartitions Number of partitions to be set
+  * @param replicaAssignment Manual replica assignment, or none
+  * @param validateOnly If true, validate the parameters without actually adding the partitions
+  * @return the updated replica assignment
+  */
+  def addPartitions(topic: String,
+                    existingAssignment: Map[Int, Seq[Int]],
+                    allBrokers: Seq[BrokerMetadata],
+                    numPartitions: Int = 1,
+                    replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
+                    validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
+    val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
+      throw new AdminOperationException(
+        s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
+          s"Assignment: $existingAssignment"))
+
+    val partitionsToAdd = numPartitions - existingAssignment.size
+    if (partitionsToAdd <= 0)
+      throw new InvalidPartitionsException(
+        s"The number of partitions for a topic can only be increased. " +
+          s"Topic $topic currently has ${existingAssignment.size} partitions, " +
+          s"$numPartitions would not be an increase.")
+
+    replicaAssignment.foreach { proposedReplicaAssignment =>
+      validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0.size,
+        allBrokers.map(_.id).toSet)
+    }
+
+    val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
+      val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
+      AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
+        startIndex, existingAssignment.size)
+    }
+    val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions
+    if (!validateOnly) {
+      info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
+        s"$proposedAssignmentForNewPartitions.")
+      // add the combined new list
+      createOrUpdateTopicPartitionAssignmentPathInZK(topic, proposedAssignment, update = true)
+    }
+    proposedAssignment
+
+  }
+
+  private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
+                                        expectedReplicationFactor: Int,
+                                        availableBrokerIds: Set[Int]): Unit = {
+
+    replicaAssignment.foreach { case (partitionId, replicas) =>
+      if (replicas.isEmpty)
+        throw new InvalidReplicaAssignmentException(
+          s"Cannot have replication factor of 0 for partition id $partitionId.")
+      if (replicas.size != replicas.toSet.size)
+        throw new InvalidReplicaAssignmentException(
+          s"Duplicate brokers not allowed in replica assignment: " +
+            s"${replicas.mkString(", ")} for partition id $partitionId.")
+      if (!replicas.toSet.subsetOf(availableBrokerIds))
+        throw new BrokerNotAvailableException(
+          s"Some brokers specified for partition id $partitionId are not available. " +
+            s"Specified brokers: ${replicas.mkString(", ")}, " +
+            s"available brokers: ${availableBrokerIds.mkString(", ")}.")
+      partitionId -> replicas.size
+    }
+    val badRepFactors = replicaAssignment.collect {
+      case (partition, replicas) if replicas.size != expectedReplicationFactor => partition -> replicas.size
+    }
+    if (badRepFactors.nonEmpty) {
+      val sortedBadRepFactors = badRepFactors.toSeq.sortBy { case (partitionId, _) => partitionId }
+      val partitions = sortedBadRepFactors.map { case (partitionId, _) => partitionId }
+      val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
+      throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " +
+        s"partition 0 has ${expectedReplicationFactor} while partitions [${partitions.mkString(", ")}] have " +
+        s"replication factors [${repFactors.mkString(", ")}], respectively.")
+    }
+  }
+
+  /**
+   * Change the configs for a given entityType and entityName
+   * @param entityType
+   * @param entityName
+   * @param configs
+   */
+  def changeConfigs(entityType: String, entityName: String, configs: Properties): Unit = {
+
+    def parseBroker(broker: String): Int = {
+      try broker.toInt
+      catch {
+        case _: NumberFormatException =>
+          throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
+      }
+    }
+
+    entityType match {
+      case ConfigType.Topic => changeTopicConfig(entityName, configs)
+      case ConfigType.Client => changeClientIdConfig(entityName, configs)
+      case ConfigType.User => changeUserOrUserClientIdConfig(entityName, configs)
+      case ConfigType.Broker => changeBrokerConfig(Seq(parseBroker(entityName)), configs)
+      case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
+    }
+  }
+
+  /**
+   * Update the config for a client and create a change notification so the change will propagate to other brokers.
+   * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
+   * and <user> configs are not specified.
+   *
+   * @param sanitizedClientId: The sanitized clientId for which configs are being changed
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+  def changeClientIdConfig(sanitizedClientId: String, configs: Properties) {
+    DynamicConfig.Client.validate(configs)
+    changeEntityConfig(ConfigType.Client, sanitizedClientId, configs)
+  }
+
+  /**
+   * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
+   * User and/or clientId components of the path may be <default>, indicating that the configuration is the default
+   * value to be applied if a more specific override is not configured.
+   *
+   * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+  def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties) {
+    if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
+      DynamicConfig.Client.validate(configs)
+    else
+      DynamicConfig.User.validate(configs)
+    changeEntityConfig(ConfigType.User, sanitizedEntityName, configs)
+  }
+
+  /**
+   * validates the topic configs
+   * @param topic
+   * @param configs
+   */
+  def validateTopicConfig(topic: String, configs: Properties): Unit = {
+    Topic.validate(topic)
+    if (!zkClient.topicExists(topic))
+      throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
+    // remove the topic overrides
+    LogConfig.validate(configs)
+  }
+
+  /**
+   * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+   *
+   * @param topic: The topic for which configs are being changed
+   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done prior to invoking this API
+   *
+   */
+   def changeTopicConfig(topic: String, configs: Properties): Unit = {
+    validateTopicConfig(topic, configs)
+    changeEntityConfig(ConfigType.Topic, topic, configs)
+  }
+
+  /**
+    * Override the broker config on some set of brokers. These overrides will be persisted between sessions, and will
+    * override any defaults entered in the broker's config files
+    *
+    * @param brokers: The list of brokers to apply config changes to
+    * @param configs: The config to change, as properties
+    */
+  def changeBrokerConfig(brokers: Seq[Int], configs: Properties): Unit = {
+    DynamicConfig.Broker.validate(configs)
+    brokers.foreach { broker => changeEntityConfig(ConfigType.Broker, broker.toString, configs)
+    }
+  }
+
+  private def changeEntityConfig(rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
+    val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
+    zkClient.setOrCreateEntityConfigs(rootEntityType, fullSanitizedEntityName, configs)
+
+    // create the change notification
+    zkClient.createConfigChangeNotification(sanitizedEntityPath)
+  }
+
+  /**
+   * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
+   * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
+   * @param rootEntityType
+   * @param sanitizedEntityName
+   * @return
+   */
+  def fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String): Properties = {
+    zkClient.getEntityConfigs(rootEntityType, sanitizedEntityName)
+  }
+
+  /**
+   * Gets all topic configs
+   * @return
+   */
+  def getAllTopicConfigs(): Map[String, Properties] =
+    zkClient.getAllTopicsInCluster.map(topic => (topic, fetchEntityConfig(ConfigType.Topic, topic))).toMap
+
+  /**
+   * Gets all the entity configs for a given entityType
+   * @param entityType
+   * @return
+   */
+  def fetchAllEntityConfigs(entityType: String): Map[String, Properties] =
+    zkClient.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(entityType, entity))).toMap
+
+  /**
+   * Gets all the entity configs for a given childEntityType
+   * @param rootEntityType
+   * @param childEntityType
+   * @return
+   */
+  def fetchAllChildEntityConfigs(rootEntityType: String, childEntityType: String): Map[String, Properties] = {
+    def entityPaths(rootPath: Option[String]): Seq[String] = {
+      val root = rootPath match {
+        case Some(path) => rootEntityType + '/' + path
+        case None => rootEntityType
+      }
+      val entityNames = zkClient.getAllEntitiesWithConfig(root)
+      rootPath match {
+        case Some(path) => entityNames.map(entityName => path + '/' + entityName)
+        case None => entityNames
+      }
+    }
+    entityPaths(None)
+      .flatMap(entity => entityPaths(Some(entity + '/' + childEntityType)))
+      .map(entityPath => (entityPath, fetchEntityConfig(rootEntityType, entityPath))).toMap
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 24d7ba9..b419654 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
 import kafka.log.LogConfig
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.security.auth.{Acl, Resource, ResourceType}
@@ -33,8 +33,8 @@ import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException}
 
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
 
 /**
  * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -168,7 +168,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
       configResponse.resultCode match {
         case Code.OK =>
           val overrides = ConfigEntityZNode.decode(configResponse.data)
-          val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
+          val logConfig = LogConfig.fromProps(config, overrides)
           logConfigs.put(topic, logConfig)
         case Code.NONODE =>
           val logConfig = LogConfig.fromProps(config, new Properties)
@@ -180,32 +180,100 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Get entity configs for a given entity name
+   * @param rootEntityType entity type
+   * @param sanitizedEntityName entity name
+   * @return The successfully gathered log configs
+   */
+  def getEntityConfigs(rootEntityType: String, sanitizedEntityName: String): Properties = {
+    val getDataRequest = GetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName))
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+
+    getDataResponse.resultCode match {
+      case Code.OK =>
+        ConfigEntityZNode.decode(getDataResponse.data)
+      case Code.NONODE => new Properties()
+      case _ => throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Sets or creates the entity znode path with the given configs depending
+   * on whether it already exists or not.
+   * @param rootEntityType entity type
+   * @param sanitizedEntityName entity name
+   * @throws KeeperException if there is an error while setting or creating the znode
+   */
+  def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties) = {
+
+    def set(configData: Array[Byte]): SetDataResponse = {
+      val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), ConfigEntityZNode.encode(config), ZkVersion.NoVersion)
+      retryRequestUntilConnected(setDataRequest)
+    }
+
+    def create(configData: Array[Byte]) = {
+      val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
+      createRecursive(path, ConfigEntityZNode.encode(config))
+    }
+
+    val configData = ConfigEntityZNode.encode(config)
+
+    val setDataResponse = set(configData)
+    setDataResponse.resultCode match {
+      case Code.NONODE => create(configData)
+      case _ => setDataResponse.resultException.foreach(e => throw e)
+    }
+  }
+
+  /**
+   * Returns all the entities for a given entityType
+   * @param entityType entity type
+   * @return List of all entity names
+   */
+  def getAllEntitiesWithConfig(entityType: String): Seq[String] = {
+    getChildren(ConfigEntityTypeZNode.path(entityType))
+  }
+
+  /**
+   * Creates config change notification
+   * @param sanitizedEntityPath  sanitizedEntityPath path to write
+   * @throws KeeperException if there is an error while setting or creating the znode
+   */
+  def createConfigChangeNotification(sanitizedEntityPath: String): Unit = {
+    val path = ConfigEntityChangeNotificationSequenceZNode.createPath
+    val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createResponse = retryRequestUntilConnected(createRequest)
+    if (createResponse.resultCode != Code.OK) {
+      createResponse.resultException.foreach(e => throw e)
+    }
+  }
+
+  /**
    * Gets all brokers in the cluster.
    * @return sequence of brokers in the cluster.
    */
   def getAllBrokersInCluster: Seq[Broker] = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path))
-    getChildrenResponse.resultCode match {
-      case Code.OK =>
-        val brokerIds = getChildrenResponse.children.map(_.toInt)
-        val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
-        val getDataResponses = retryRequestsUntilConnected(getDataRequests)
-        getDataResponses.flatMap { getDataResponse =>
-          val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
-          getDataResponse.resultCode match {
-            case Code.OK =>
-              Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
-            case Code.NONODE => None
-            case _ => throw getDataResponse.resultException.get
-          }
-        }
-      case Code.NONODE =>
-        Seq.empty
-      case _ =>
-        throw getChildrenResponse.resultException.get
+    val brokerIds = getSortedBrokerList
+    val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
+    val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+    getDataResponses.flatMap { getDataResponse =>
+      val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
+      getDataResponse.resultCode match {
+        case Code.OK =>
+          Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+        case Code.NONODE => None
+        case _ => throw getDataResponse.resultException.get
+      }
     }
   }
 
+
+  /**
+   * Gets the list of sorted broker Ids
+   */
+  def getSortedBrokerList(): Seq[Int] =
+    getChildren(BrokerIdsZNode.path).map(_.toInt).sorted
+
   /**
    * Gets all topics in the cluster.
    * @return sequence of topics in the cluster.
@@ -221,6 +289,15 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Checks the topic existence
+   * @param topicName
+   * @return true if topic exists else false
+   */
+  def topicExists(topicName: String): Boolean = {
+    pathExists(TopicZNode.path(topicName))
+  }
+
+  /**
    * Sets the topic znode with the given assignment.
    * @param topic the topic whose assignment is being set.
    * @param assignment the partition to replica mapping to set for the given topic
@@ -232,6 +309,29 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Sets the topic znode with the given assignment.
+   * @param topic the topic whose assignment is being set.
+   * @param assignment the partition to replica mapping to set for the given topic
+   * @throws KeeperException if there is an error while setting assignment
+   */
+  def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
+    val setDataResponse = setTopicAssignmentRaw(topic, assignment)
+    if (setDataResponse.resultCode != Code.OK) {
+      setDataResponse.resultException.foreach(e => throw e)
+    }
+  }
+
+  /**
+   * Create the topic znode with the given assignment.
+   * @param topic the topic whose assignment is being set.
+   * @param assignment the partition to replica mapping to set for the given topic
+   * @throws KeeperException if there is an error while creating assignment
+   */
+  def createTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
+    createRecursive(TopicZNode.path(topic), TopicZNode.encode(assignment))
+  }
+
+  /**
    * Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path.
    * @return sequence of znode names and not the absolute znode path.
    */
@@ -271,7 +371,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     if (getChildrenResponse.resultCode == Code.OK) {
       deleteLogDirEventNotifications(getChildrenResponse.children)
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      throw getChildrenResponse.resultException.get
+      getChildrenResponse.resultException.foreach(e => throw e)
     }
   }
 
@@ -305,6 +405,40 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Gets partition the assignments for the given topics.
+   * @param topics the topics whose partitions we wish to get the assignments for.
+   * @return the partition assignment for each partition from the given topics.
+   */
+  def getPartitionAssignmentForTopics(topics: Set[String]):  Map[String, Map[Int, Seq[Int]]] = {
+    val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
+    val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
+    getDataResponses.flatMap { getDataResponse =>
+      val topic = getDataResponse.ctx.get.asInstanceOf[String]
+       if (getDataResponse.resultCode == Code.OK) {
+        val partitionMap = TopicZNode.decode(topic, getDataResponse.data).map { case (k, v) => (k.partition, v) }
+        Map(topic -> partitionMap)
+      } else if (getDataResponse.resultCode == Code.NONODE) {
+        Map.empty[String, Map[Int, Seq[Int]]]
+      } else {
+        throw getDataResponse.resultException.get
+      }
+    }.toMap
+  }
+
+  /**
+   * Gets the partition numbers for the given topics
+   * @param topics the topics whose partitions we wish to get.
+   * @return the partition array for each topic from the given topics.
+   */
+  def getPartitionsForTopics(topics: Set[String]): Map[String, Seq[Int]] = {
+    getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
+      val topic = topicAndPartitionMap._1
+      val partitionMap = topicAndPartitionMap._2
+      topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
+    }
+  }
+
+  /**
    * Gets the partition count for a given topic
    * @param topic The topic to get partition count for.
    * @return  optional integer that is Some if the topic exists and None otherwise.
@@ -318,6 +452,16 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Gets the assigned replicas for a specific topic and partition
+   * @param topicPartition TopicAndPartition to get assigned replicas for .
+   * @return List of assigned replicas
+   */
+  def getReplicasForPartition(topicPartition: TopicPartition): Seq[Int] = {
+    val topicData = getReplicaAssignmentForTopics(Set(topicPartition.topic))
+    topicData.getOrElse(topicPartition, Seq.empty)
+  }
+
+  /**
    * Gets the data and version at the given zk path
    * @param path zk node path
    * @return A tuple of 2 elements, where first element is zk node data as string
@@ -413,6 +557,25 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Creates the delete topic znode.
+   * @param topicName topic name
+   * @throws KeeperException if there is an error while setting or creating the znode
+   */
+  def createDeleteTopicPath(topicName: String): Unit = {
+    createRecursive(DeleteTopicsTopicZNode.path(topicName))
+  }
+
+
+  /**
+   * Checks if topic is marked for deletion
+   * @param topic
+   * @return true if topic is marked for deletion, else false
+   */
+  def isTopicMarkedForDeletion(topic: String): Boolean = {
+    pathExists(DeleteTopicsTopicZNode.path(topic))
+  }
+
+  /**
    * Get all topics marked for deletion.
    * @return sequence of topics marked for deletion.
    */
@@ -479,6 +642,21 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Creates the partition reassignment znode with the given reassignment.
+   * @param reassignment the reassignment to set on the reassignment znode.
+   * @throws KeeperException if there is an error while setting or creating the znode
+   */
+  def createPartitionReassignment(reassignment: Map[TopicPartition, Seq[Int]])  = {
+    val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
+      acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
+    val createResponse = retryRequestUntilConnected(createRequest)
+
+    if (createResponse.resultCode != Code.OK) {
+      throw createResponse.resultException.get
+    }
+  }
+
+  /**
    * Deletes the partition reassignment znode.
    */
   def deletePartitionReassignment(): Unit = {
@@ -487,6 +665,22 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Checks if reassign partitions is in progress
+   * @return true if reassign partitions is in progress, else false
+   */
+  def reassignPartitionsInProgress(): Boolean = {
+    pathExists(ReassignPartitionsZNode.path)
+  }
+
+  /**
+   * Gets the partitions being reassigned for given topics
+   * @return ReassignedPartitionsContexts for each topic which are being reassigned.
+   */
+  def getPartitionsBeingReassigned(): Map[TopicPartition, ReassignedPartitionsContext] = {
+    getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+  }
+
+  /**
    * Gets topic partition states for the given partitions.
    * @param partitions the partitions for which we want to get states.
    * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
@@ -504,6 +698,35 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Gets topic partition state for the given partition.
+   * @param partition the partition for which we want to get state.
+   * @return LeaderIsrAndControllerEpoch of the partition state if exists, else None
+   */
+  def getTopicPartitionState(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
+    val getDataResponse = getTopicPartitionStatesRaw(Seq(partition)).head
+    if (getDataResponse.resultCode == Code.OK) {
+      TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
+    } else if (getDataResponse.resultCode == Code.NONODE) {
+      None
+    } else {
+      throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Gets the leader for a given partition
+   * @param partition
+   * @return optional integer if the leader exists and None otherwise.
+   */
+  def getLeaderForPartition(partition: TopicPartition): Option[Int] = {
+    val leaderIsrEpoch = getTopicPartitionState(partition)
+    if (leaderIsrEpoch.isDefined)
+      Option(leaderIsrEpoch.get.leaderAndIsr.leader)
+    else
+      None
+  }
+
+  /**
    * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
    * @return sequence of znode names and not the absolute znode path.
    */
@@ -543,7 +766,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     if (getChildrenResponse.resultCode == Code.OK) {
       deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber))
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      throw getChildrenResponse.resultException.get
+      getChildrenResponse.resultException.foreach(e => throw e)
     }
   }
 
@@ -641,9 +864,9 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * Creates the required zk nodes for Acl storage
    */
   def createAclPaths(): Unit = {
-    createRecursive(AclZNode.path)
-    createRecursive(AclChangeNotificationZNode.path)
-    ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name)))
+    createRecursive(AclZNode.path, throwIfPathExists = false)
+    createRecursive(AclChangeNotificationZNode.path, throwIfPathExists = false)
+    ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name), throwIfPathExists = false))
   }
 
   /**
@@ -719,7 +942,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     if (getChildrenResponse.resultCode == Code.OK) {
       deleteAclChangeNotifications(getChildrenResponse.children)
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      throw getChildrenResponse.resultException.get
+      getChildrenResponse.resultException.foreach(e => throw e)
     }
   }
 
@@ -735,7 +958,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     val deleteResponses = retryRequestsUntilConnected(deleteRequests)
     deleteResponses.foreach { deleteResponse =>
       if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
-        throw deleteResponse.resultException.get
+        deleteResponse.resultException.foreach(e => throw e)
       }
     }
   }
@@ -790,7 +1013,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
       case _ => throw deleteResponse.resultException.get
     }
   }
-  
+
   /**
    * Deletes the zk node recursively
    * @param path
@@ -889,7 +1112,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     }
   }
 
-   /**
+  /**
    * Set the committed offset for a topic partition and group
    * @param group the group whose offset is being set
    * @param topicPartition the topic partition whose offset is being set
@@ -898,8 +1121,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   def setOrCreateConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): Unit = {
     val setDataResponse = setConsumerOffset(group, topicPartition, offset)
     if (setDataResponse.resultCode == Code.NONODE) {
-      val createResponse = createConsumerOffset(group, topicPartition, offset)
-      createResponse.resultException.foreach(e => throw e)
+      createConsumerOffset(group, topicPartition, offset)
     } else {
       setDataResponse.resultException.foreach(e => throw e)
     }
@@ -911,17 +1133,9 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     retryRequestUntilConnected(setDataRequest)
   }
 
-  private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): CreateResponse = {
+  private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long) = {
     val path = ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition)
-    val createRequest = CreateRequest(path, ConsumerOffset.encode(offset), acls(path), CreateMode.PERSISTENT)
-    var createResponse = retryRequestUntilConnected(createRequest)
-    if (createResponse.resultCode == Code.NONODE) {
-      val indexOfLastSlash = path.lastIndexOf("/")
-      if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
-      createRecursive(path.substring(0, indexOfLastSlash))
-      createResponse = retryRequestUntilConnected(createRequest)
-    }
-    createResponse
+    createRecursive(path, ConsumerOffset.encode(offset))
   }
 
   /**
@@ -955,21 +1169,40 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     }
   }
 
-  private[zk] def createRecursive(path: String): Unit = {
-    val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
-    var createResponse = retryRequestUntilConnected(createRequest)
-    if (createResponse.resultCode == Code.NONODE) {
+  private[zk] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
+
+    def parentPath(path: String): String = {
       val indexOfLastSlash = path.lastIndexOf("/")
       if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
-      val parentPath = path.substring(0, indexOfLastSlash)
-      createRecursive(parentPath)
-      createResponse = retryRequestUntilConnected(createRequest)
-      if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
+      path.substring(0, indexOfLastSlash)
+    }
+
+    def createRecursive0(path: String): Unit = {
+      val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
+      var createResponse = retryRequestUntilConnected(createRequest)
+      if (createResponse.resultCode == Code.NONODE) {
+        createRecursive0(parentPath(path))
+        createResponse = retryRequestUntilConnected(createRequest)
+        if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
+          throw createResponse.resultException.get
+        }
+      } else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
         throw createResponse.resultException.get
       }
-    } else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
-      throw createResponse.resultException.get
     }
+
+    val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT)
+    var createResponse = retryRequestUntilConnected(createRequest)
+
+    if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
+      createResponse.resultException.foreach(e => throw e)
+    } else if (createResponse.resultCode == Code.NONODE) {
+      createRecursive0(parentPath(path))
+      createResponse = retryRequestUntilConnected(createRequest)
+      createResponse.resultException.foreach(e => throw e)
+    } else if (createResponse.resultCode != Code.NODEEXISTS)
+      createResponse.resultException.foreach(e => throw e)
+
   }
 
   private def createTopicPartition(partitions: Seq[TopicPartition]): Seq[CreateResponse] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 4c618a0..a0085cd 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -28,8 +28,6 @@ import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.data.Stat
 
-import scala.collection.Seq
-
 // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
 
 object ControllerZNode {
@@ -140,16 +138,29 @@ object ConfigEntityZNode {
     import scala.collection.JavaConverters._
     Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala))
   }
-  def decode(bytes: Array[Byte]): Option[Properties] = {
-    Json.parseBytes(bytes).map { js =>
-      val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
-      val props = new Properties()
-      configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
-      props
+  def decode(bytes: Array[Byte]): Properties = {
+    val props = new Properties()
+    if (bytes != null) {
+      Json.parseBytes(bytes).map { js =>
+        val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
+        configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
+      }
     }
+    props
   }
 }
 
+object ConfigEntityChangeNotificationZNode {
+  def path = s"${ConfigZNode.path}/changes"
+}
+
+object ConfigEntityChangeNotificationSequenceZNode {
+  val SequenceNumberPrefix = "config_change_"
+  def createPath = s"${ConfigEntityChangeNotificationZNode.path}/$SequenceNumberPrefix"
+  def encode(sanitizedEntityPath : String): Array[Byte] = Json.encodeAsBytes(Map("version" -> 2, "entity_path" -> sanitizedEntityPath))
+  def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+}
+
 object IsrChangeNotificationZNode {
   def path = "/isr_change_notification"
 }