You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/06/16 08:30:09 UTC

[kafka] branch kafka-8545-remove-legacy-zk-utils created (now 1bc6f6a)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch kafka-8545-remove-legacy-zk-utils
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at 1bc6f6a  KAFKA-8545: Remove legacy ZkUtils

This branch includes the following new commits:

     new 1bc6f6a  KAFKA-8545: Remove legacy ZkUtils

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kafka] 01/01: KAFKA-8545: Remove legacy ZkUtils

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch kafka-8545-remove-legacy-zk-utils
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1bc6f6ae0ec6da1b75f290d56460883cabbb8f09
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun Jun 16 01:28:52 2019 -0700

    KAFKA-8545: Remove legacy ZkUtils
    
    ZkUtils is not used by the broker, has been
    deprecated since 2.0.0 and it was never intended
    as a public API. We should remove it.
---
 core/src/main/scala/kafka/admin/AdminUtils.scala   | 431 +---------
 core/src/main/scala/kafka/utils/ZkUtils.scala      | 890 ---------------------
 .../other/kafka/ReplicationQuotasTestRig.scala     |   6 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    | 213 -----
 .../scala/unit/kafka/admin/TestAdminUtils.scala    |  29 -
 .../test/scala/unit/kafka/utils/ZkUtilsTest.scala  | 133 ---
 6 files changed, 8 insertions(+), 1694 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index cd47969..c3748cf 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -17,54 +17,16 @@
 
 package kafka.admin
 
-import kafka.log.LogConfig
-import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils._
-import kafka.utils.ZkUtils._
 import java.util.Random
-import java.util.Properties
 
-import kafka.common.TopicAlreadyMarkedForDeletionException
-import org.apache.kafka.common.errors._
+import kafka.utils.Logging
+import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicationFactorException}
 
-import collection.{Map, Set, mutable, _}
-import scala.collection.JavaConverters._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.apache.kafka.common.internals.Topic
+import collection.{Map, mutable, _}
 
-@deprecated("This class is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-trait AdminUtilities {
-  def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
-  def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
-  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties)
-  def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties)
-
-  def changeConfigs(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties): Unit = {
-
-    def parseBroker(broker: String): Int = {
-      try broker.toInt
-      catch {
-        case _: NumberFormatException =>
-          throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
-      }
-    }
-
-    entityType match {
-      case ConfigType.Topic => changeTopicConfig(zkUtils, entityName, configs)
-      case ConfigType.Client => changeClientIdConfig(zkUtils, entityName, configs)
-      case ConfigType.User => changeUserOrUserClientIdConfig(zkUtils, entityName, configs)
-      case ConfigType.Broker => changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs)
-      case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
-    }
-  }
-
-  def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties
-}
-
-object AdminUtils extends Logging with AdminUtilities {
+object AdminUtils extends Logging {
   val rand = new Random
   val AdminClientId = "__admin_client"
-  val EntityConfigChangeZnodePrefix = "config_change_"
 
   /**
    * There are 3 goals of replica assignment:
@@ -256,391 +218,6 @@ object AdminUtils extends Logging with AdminUtilities {
       .groupBy { case (rack, _) => rack }
       .map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) }
   }
- /**
-  * Add partitions to existing topic with optional replica assignment
-  *
-  * @param zkUtils Zookeeper utilities
-  * @param topic Topic for adding partitions to
-  * @param existingAssignment A map from partition id to its assigned replicas
-  * @param allBrokers All brokers in the cluster
-  * @param numPartitions Number of partitions to be set
-  * @param replicaAssignment Manual replica assignment, or none
-  * @param validateOnly If true, validate the parameters without actually adding the partitions
-  * @return the updated replica assignment
-  */
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def addPartitions(zkUtils: ZkUtils,
-                    topic: String,
-                    existingAssignment: Map[Int, Seq[Int]],
-                    allBrokers: Seq[BrokerMetadata],
-                    numPartitions: Int = 1,
-                    replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
-                    validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
-    val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
-      throw new AdminOperationException(
-        s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
-          s"Assignment: $existingAssignment"))
-
-    val partitionsToAdd = numPartitions - existingAssignment.size
-    if (partitionsToAdd <= 0)
-      throw new InvalidPartitionsException(
-        s"The number of partitions for a topic can only be increased. " +
-          s"Topic $topic currently has ${existingAssignment.size} partitions, " +
-          s"$numPartitions would not be an increase.")
-
-    replicaAssignment.foreach { proposedReplicaAssignment =>
-      validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0,
-        allBrokers.map(_.id).toSet)
-    }
-
-    val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
-      val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
-      AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
-        startIndex, existingAssignment.size)
-    }
-    val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions
-    if (!validateOnly) {
-      info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
-        s"$proposedAssignmentForNewPartitions.")
-      // add the combined new list
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, proposedAssignment, update = true)
-    }
-    proposedAssignment
-
-  }
-
-  /**
-    * Parse a replica assignment string of the form:
-    * {{{
-    * broker_id_for_part1_replica1:broker_id_for_part1_replica2,
-    * broker_id_for_part2_replica1:broker_id_for_part2_replica2,
-    * ...
-    * }}}
-    */
-  def parseReplicaAssignment(replicaAssignmentsString: String, startPartitionId: Int): Map[Int, Seq[Int]] = {
-    val assignmentStrings = replicaAssignmentsString.split(",")
-    val assignmentMap = mutable.Map[Int, Seq[Int]]()
-    var partitionId = startPartitionId
-    for (assignmentString <- assignmentStrings) {
-      val brokerIds = assignmentString.split(":").map(_.trim.toInt).toSeq
-      assignmentMap.put(partitionId, brokerIds)
-      partitionId = partitionId + 1
-    }
-    assignmentMap
-  }
-
-  private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
-                                        existingAssignmentPartition0: Seq[Int],
-                                        availableBrokerIds: Set[Int]): Unit = {
-
-    replicaAssignment.foreach { case (partitionId, replicas) =>
-      if (replicas.isEmpty)
-        throw new InvalidReplicaAssignmentException(
-          s"Cannot have replication factor of 0 for partition id $partitionId.")
-      if (replicas.size != replicas.toSet.size)
-        throw new InvalidReplicaAssignmentException(
-          s"Duplicate brokers not allowed in replica assignment: " +
-            s"${replicas.mkString(", ")} for partition id $partitionId.")
-      if (!replicas.toSet.subsetOf(availableBrokerIds))
-        throw new BrokerNotAvailableException(
-          s"Some brokers specified for partition id $partitionId are not available. " +
-            s"Specified brokers: ${replicas.mkString(", ")}, " +
-            s"available brokers: ${availableBrokerIds.mkString(", ")}.")
-      partitionId -> replicas.size
-    }
-    val badRepFactors = replicaAssignment.collect {
-      case (partition, replicas) if replicas.size != existingAssignmentPartition0.size => partition -> replicas.size
-    }
-    if (badRepFactors.nonEmpty) {
-      val sortedBadRepFactors = badRepFactors.toSeq.sortBy { case (partitionId, _) => partitionId }
-      val partitions = sortedBadRepFactors.map { case (partitionId, _) => partitionId }
-      val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
-      throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " +
-        s"partition 0 has ${existingAssignmentPartition0.size} while partitions [${partitions.mkString(", ")}] have " +
-        s"replication factors [${repFactors.mkString(", ")}], respectively.")
-    }
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def deleteTopic(zkUtils: ZkUtils, topic: String) {
-    if (topicExists(zkUtils, topic)) {
-      try {
-        zkUtils.createPersistentPath(getDeleteTopicPath(topic))
-      } catch {
-        case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
-          "topic %s is already marked for deletion".format(topic))
-        case e2: Throwable => throw new AdminOperationException(e2)
-      }
-    } else {
-      throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
-    }
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
-    zkUtils.pathExists(getTopicPath(topic))
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
-                         brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
-    val allBrokers = zkUtils.getAllBrokersInCluster()
-    val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
-    val brokersWithRack = brokers.filter(_.rack.nonEmpty)
-    if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
-      throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
-        " to make replica assignment without rack information.")
-    }
-    val brokerMetadatas = rackAwareMode match {
-      case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
-      case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
-        brokers.map(broker => BrokerMetadata(broker.id, None))
-      case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
-    }
-    brokerMetadatas.sortBy(_.id)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def createTopic(zkUtils: ZkUtils,
-                  topic: String,
-                  partitions: Int,
-                  replicationFactor: Int,
-                  topicConfig: Properties = new Properties,
-                  rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
-    val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
-    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def validateCreateOrUpdateTopic(zkUtils: ZkUtils,
-                                  topic: String,
-                                  partitionReplicaAssignment: Map[Int, Seq[Int]],
-                                  config: Properties,
-                                  update: Boolean): Unit = {
-    // validate arguments
-    Topic.validate(topic)
-
-    if (!update) {
-      if (topicExists(zkUtils, topic))
-        throw new TopicExistsException(s"Topic '$topic' already exists.")
-      else if (Topic.hasCollisionChars(topic)) {
-        val allTopics = zkUtils.getAllTopics()
-        // check again in case the topic was created in the meantime, otherwise the
-        // topic could potentially collide with itself
-        if (allTopics.contains(topic))
-          throw new TopicExistsException(s"Topic '$topic' already exists.")
-        val collidingTopics = allTopics.filter(Topic.hasCollision(topic, _))
-        if (collidingTopics.nonEmpty) {
-          throw new InvalidTopicException(s"Topic '$topic' collides with existing topics: ${collidingTopics.mkString(", ")}")
-        }
-      }
-    }
-
-    if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1)
-      throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas")
-
-    partitionReplicaAssignment.values.foreach(reps =>
-      if (reps.size != reps.toSet.size)
-        throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment)
-    )
-
-
-    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
-    if (!update)
-      LogConfig.validate(config)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
-                                                     topic: String,
-                                                     partitionReplicaAssignment: Map[Int, Seq[Int]],
-                                                     config: Properties = new Properties,
-                                                     update: Boolean = false) {
-    validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update)
-
-    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
-    if (!update) {
-      // write out the config if there is any, this isn't transactional with the partition assignments
-      writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
-    }
-
-    // create the partition assignment
-    writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
-    try {
-      val zkPath = getTopicPath(topic)
-      val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))
-
-      if (!update) {
-        info(s"Topic creation $jsonPartitionData")
-        zkUtils.createPersistentPath(zkPath, jsonPartitionData)
-      } else {
-        info(s"Topic update $jsonPartitionData")
-        zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
-      }
-      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
-    } catch {
-      case _: ZkNodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
-      case e2: Throwable => throw new AdminOperationException(e2.toString)
-    }
-  }
-
-  /**
-   * Update the config for a client and create a change notification so the change will propagate to other brokers.
-   * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
-   * and <user> configs are not specified.
-   *
-   * @param zkUtils Zookeeper utilities used to write the config to ZK
-   * @param sanitizedClientId: The sanitized clientId for which configs are being changed
-   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
-   *                 existing configs need to be deleted, it should be done prior to invoking this API
-   *
-   */
-   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) {
-    DynamicConfig.Client.validate(configs)
-    changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs)
-  }
-
-  /**
-   * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
-   * User and/or clientId components of the path may be <default>, indicating that the configuration is the default
-   * value to be applied if a more specific override is not configured.
-   *
-   * @param zkUtils Zookeeper utilities used to write the config to ZK
-   * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
-   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
-   *                 existing configs need to be deleted, it should be done prior to invoking this API
-   *
-   */
-   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
-    if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
-      DynamicConfig.Client.validate(configs)
-    else
-      DynamicConfig.User.validate(configs)
-    changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def validateTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {
-    Topic.validate(topic)
-    if (!topicExists(zkUtils, topic))
-      throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
-    // remove the topic overrides
-    LogConfig.validate(configs)
-  }
-
-  /**
-   * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
-   *
-   * @param zkUtils Zookeeper utilities used to write the config to ZK
-   * @param topic: The topic for which configs are being changed
-   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
-   *                 existing configs need to be deleted, it should be done prior to invoking this API
-   *
-   */
-   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
-    validateTopicConfig(zkUtils, topic, configs)
-    changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
-  }
-
-  /**
-    * Override the broker config on some set of brokers. These overrides will be persisted between sessions, and will
-    * override any defaults entered in the broker's config files
-    *
-    * @param zkUtils: Zookeeper utilities used to write the config to ZK
-    * @param brokers: The list of brokers to apply config changes to
-    * @param configs: The config to change, as properties
-    */
-   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit = {
-    DynamicConfig.Broker.validate(configs)
-    brokers.foreach { broker =>
-      changeEntityConfig(zkUtils, ConfigType.Broker, broker.toString, configs)
-    }
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
-    val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
-    val entityConfigPath = getEntityConfigPath(rootEntityType, fullSanitizedEntityName)
-    // write the new config--may not exist if there were previously no overrides
-    writeEntityConfig(zkUtils, entityConfigPath, configs)
-
-    // create the change notification
-    val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
-    val content = Json.legacyEncodeAsString(getConfigChangeZnodeData(sanitizedEntityPath))
-    zkUtils.createSequentialPersistentPath(seqNode, content)
-  }
-
-  def getConfigChangeZnodeData(sanitizedEntityPath: String): Map[String, Any] = {
-    Map("version" -> 2, "entity_path" -> sanitizedEntityPath)
-  }
-
-  /**
-   * Write out the entity config to zk, if there is any
-   */
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) {
-    val map = Map("version" -> 1, "config" -> config.asScala)
-    zkUtils.updatePersistentPath(entityPath, Json.legacyEncodeAsString(map))
-  }
-
-  /**
-   * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
-   * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
-   */
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = {
-    val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName)
-    // readDataMaybeNull returns Some(null) if the path exists, but there is no data
-    val str = zkUtils.readDataMaybeNull(entityConfigPath)._1.orNull
-    val props = new Properties()
-    if (str != null) {
-      Json.parseFull(str).foreach { jsValue =>
-        val jsObject = jsValue.asJsonObjectOption.getOrElse {
-          throw new IllegalArgumentException(s"Unexpected value in config: $str, entity_config_path: $entityConfigPath")
-        }
-        require(jsObject("version").to[Int] == 1)
-        val config = jsObject.get("config").flatMap(_.asJsonObjectOption).getOrElse {
-          throw new IllegalArgumentException(s"Invalid $entityConfigPath config: $str")
-        }
-        config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) }
-      }
-    }
-    props
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
-    zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
-    zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
-    def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
-      val root = rootPath match {
-        case Some(path) => rootEntityType + '/' + path
-        case None => rootEntityType
-      }
-      val entityNames = zkUtils.getAllEntitiesWithConfig(root)
-      rootPath match {
-        case Some(path) => entityNames.map(entityName => path + '/' + entityName)
-        case None => entityNames
-      }
-    }
-    entityPaths(zkUtils, None)
-      .flatMap(entity => entityPaths(zkUtils, Some(entity + '/' + childEntityType)))
-      .map(entityPath => (entityPath, fetchEntityConfig(zkUtils, rootEntityType, entityPath))).toMap
-  }
 
   private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
     val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
deleted file mode 100644
index dd85090..0000000
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ /dev/null
@@ -1,890 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.nio.charset.StandardCharsets
-
-import kafka.admin._
-import kafka.api.LeaderAndIsr
-import kafka.cluster._
-import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
-import kafka.zk.{BrokerIdZNode, ReassignPartitionsZNode, ZkData}
-import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
-import org.I0Itec.zkclient.serialize.ZkSerializer
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
-import org.apache.kafka.common.config.ConfigException
-import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.ZooDefs
-
-import scala.collection._
-import scala.collection.JavaConverters._
-import org.apache.kafka.common.TopicPartition
-
-@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " +
-  "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0")
-object ZkUtils {
-
-  private val UseDefaultAcls = new java.util.ArrayList[ACL]
-
-  // Important: it is necessary to add any new top level Zookeeper path here
-  val AdminPath = "/admin"
-  val BrokersPath = "/brokers"
-  val ClusterPath = "/cluster"
-  val ConfigPath = "/config"
-  val ControllerPath = "/controller"
-  val ControllerEpochPath = "/controller_epoch"
-  val IsrChangeNotificationPath = "/isr_change_notification"
-  val LogDirEventNotificationPath = "/log_dir_event_notification"
-  val KafkaAclPath = "/kafka-acl"
-  val KafkaAclChangesPath = "/kafka-acl-changes"
-
-  val ConsumersPath = "/consumers"
-  val ClusterIdPath = s"$ClusterPath/id"
-  val BrokerIdsPath = s"$BrokersPath/ids"
-  val BrokerTopicsPath = s"$BrokersPath/topics"
-  val ReassignPartitionsPath = s"$AdminPath/reassign_partitions"
-  val DeleteTopicsPath = s"$AdminPath/delete_topics"
-  val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election"
-  val BrokerSequenceIdPath = s"$BrokersPath/seqid"
-  val ConfigChangesPath = s"$ConfigPath/changes"
-  val ConfigUsersPath = s"$ConfigPath/users"
-  val ConfigBrokersPath = s"$ConfigPath/brokers"
-  val ProducerIdBlockPath = "/latest_producer_id_block"
-
-  val SecureZkRootPaths = ZkData.SecureRootPaths
-
-  val SensitiveZkRootPaths = ZkData.SensitiveRootPaths
-
-  def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
-    val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
-    new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled)
-  }
-
-  /*
-   * Used in tests
-   */
-  def apply(zkClient: ZkClient, isZkSecurityEnabled: Boolean): ZkUtils = {
-    new ZkUtils(zkClient, null, isZkSecurityEnabled)
-  }
-
-  def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = {
-    val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer)
-    zkClient
-  }
-
-  def createZkClientAndConnection(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): (ZkClient, ZkConnection) = {
-    val zkConnection = new ZkConnection(zkUrl, sessionTimeout)
-    val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer)
-    (zkClient, zkConnection)
-  }
-
-  def sensitivePath(path: String): Boolean = ZkData.sensitivePath(path)
-
-  def defaultAcls(isSecure: Boolean, path: String): java.util.List[ACL] = ZkData.defaultAcls(isSecure, path).asJava
-
-  def maybeDeletePath(zkUrl: String, dir: String) {
-    try {
-      val zk = createZkClient(zkUrl, 30*1000, 30*1000)
-      zk.deleteRecursive(dir)
-      zk.close()
-    } catch {
-      case _: Throwable => // swallow
-    }
-  }
-
-  /*
-   * Get calls that only depend on static paths
-   */
-  def getTopicPath(topic: String): String = {
-    ZkUtils.BrokerTopicsPath + "/" + topic
-  }
-
-  def getTopicPartitionsPath(topic: String): String = {
-    getTopicPath(topic) + "/partitions"
-  }
-
-  def getTopicPartitionPath(topic: String, partitionId: Int): String =
-    getTopicPartitionsPath(topic) + "/" + partitionId
-
-  def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
-    getTopicPartitionPath(topic, partitionId) + "/" + "state"
-
-  def getEntityConfigRootPath(entityType: String): String =
-    ZkUtils.ConfigPath + "/" + entityType
-
-  def getEntityConfigPath(entityType: String, entity: String): String =
-    getEntityConfigRootPath(entityType) + "/" + entity
-
-  def getEntityConfigPath(entityPath: String): String =
-    ZkUtils.ConfigPath + "/" + entityPath
-
-  def getDeleteTopicPath(topic: String): String =
-    DeleteTopicsPath + "/" + topic
-
-  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
-    val utf8Bytes = jsonData.getBytes(StandardCharsets.UTF_8)
-    val assignments = ReassignPartitionsZNode.decode(utf8Bytes) match {
-      case Left(e) => throw e
-      case Right(result) => result
-    }
-
-    assignments.map { case (tp, p) => (new TopicAndPartition(tp), p) }
-  }
-
-  def controllerZkData(brokerId: Int, timestamp: Long): String = {
-    Json.legacyEncodeAsString(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
-  }
-
-  def preferredReplicaLeaderElectionZkData(partitions: scala.collection.Set[TopicAndPartition]): String = {
-    Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition))))
-  }
-
-  def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
-    Json.legacyEncodeAsString(Map(
-      "version" -> 1,
-      "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) =>
-        Map(
-          "topic" -> topic,
-          "partition" -> partition,
-          "replicas" -> replicas
-        )
-      }
-    ))
-  }
-
-  def getReassignmentJson(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]]): String = {
-    Json.encodeAsString(Map(
-      "version" -> 1,
-      "partitions" -> partitionsToBeReassigned.map { case (tp, replicas) =>
-        Map(
-          "topic" -> tp.topic,
-          "partition" -> tp.partition,
-          "replicas" -> replicas.asJava
-        ).asJava
-      }.asJava
-    ).asJava)
-  }
-}
-
-/**
- * Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead.
- */
-@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " +
-  "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0")
-class ZkUtils(val zkClient: ZkClient,
-              val zkConnection: ZkConnection,
-              val isSecure: Boolean) extends Logging {
-  import ZkUtils._
-
-  // These are persistent ZK paths that should exist on kafka broker startup.
-  val persistentZkPaths = ZkData.PersistentZkPaths
-
-  // Visible for testing
-  val zkPath = new ZkPath(zkClient)
-
-  def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path)
-
-  def getController(): Int = {
-    readDataMaybeNull(ControllerPath)._1 match {
-      case Some(controller) => parseControllerId(controller)
-      case None => throw new KafkaException("Controller doesn't exist")
-    }
-  }
-
-  def parseControllerId(controllerInfoString: String): Int = {
-    try {
-      Json.parseFull(controllerInfoString) match {
-        case Some(js) => js.asJsonObject("brokerid").to[Int]
-        case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
-      }
-    } catch {
-      case _: Throwable =>
-        // It may be due to an incompatible controller register version
-        warn("Failed to parse the controller info as json. "
-          + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
-        try controllerInfoString.toInt
-        catch {
-          case t: Throwable => throw new KafkaException(s"Failed to parse the controller info: $controllerInfoString. This is neither the new or the old format.", t)
-        }
-    }
-  }
-
-  /* Represents a cluster identifier. Stored in Zookeeper in JSON format: {"version" -> "1", "id" -> id } */
-  object ClusterId {
-
-    def toJson(id: String) = {
-      Json.legacyEncodeAsString(Map("version" -> "1", "id" -> id))
-    }
-
-    def fromJson(clusterIdJson: String): String = {
-      Json.parseFull(clusterIdJson).map(_.asJsonObject("id").to[String]).getOrElse {
-        throw new KafkaException(s"Failed to parse the cluster id json $clusterIdJson")
-      }
-    }
-  }
-
-  def getClusterId: Option[String] =
-    readDataMaybeNull(ClusterIdPath)._1.map(ClusterId.fromJson)
-
-  def createOrGetClusterId(proposedClusterId: String): String = {
-    try {
-      createPersistentPath(ClusterIdPath, ClusterId.toJson(proposedClusterId))
-      proposedClusterId
-    } catch {
-      case _: ZkNodeExistsException =>
-        getClusterId.getOrElse(throw new KafkaException("Failed to get cluster id from Zookeeper. This can only happen if /cluster/id is deleted from Zookeeper."))
-    }
-  }
-
-  def getSortedBrokerList(): Seq[Int] =
-    getChildren(BrokerIdsPath).map(_.toInt).sorted
-
-  def getAllBrokersInCluster(): Seq[Broker] = {
-    val brokerIds = getChildrenParentMayNotExist(BrokerIdsPath).sorted
-    brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get)
-  }
-
-  def getLeaderAndIsrForPartition(topic: String, partition: Int): Option[LeaderAndIsr] = {
-    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
-    val (leaderAndIsrOpt, stat) = readDataMaybeNull(leaderAndIsrPath)
-    debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
-    leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat).map(_.leaderAndIsr))
-  }
-
-  private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
-    Json.parseFull(leaderAndIsrStr).flatMap { js =>
-      val leaderIsrAndEpochInfo = js.asJsonObject
-      val leader = leaderIsrAndEpochInfo("leader").to[Int]
-      val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
-      val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
-      val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
-      val zkPathVersion = stat.getVersion
-      trace(s"Leader $leader, Epoch $epoch, Isr $isr, Zk path version $zkPathVersion for leaderAndIsrPath $path")
-      Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}
-  }
-
-  def setupCommonPaths() {
-    for(path <- persistentZkPaths)
-      makeSurePersistentPathExists(path)
-  }
-
-  def getLeaderForPartition(topic: String, partition: Int): Option[Int] = {
-    readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1.flatMap { leaderAndIsr =>
-      Json.parseFull(leaderAndIsr).map(_.asJsonObject("leader").to[Int])
-    }
-  }
-
-  /**
-   * This API should read the epoch in the ISR path. It is sufficient to read the epoch in the ISR path, since if the
-   * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
-   * other broker will retry becoming leader with the same new epoch value.
-   */
-  def getEpochForPartition(topic: String, partition: Int): Int = {
-    readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1 match {
-      case Some(leaderAndIsr) =>
-        Json.parseFull(leaderAndIsr) match {
-          case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for partition [%s,%d] is invalid".format(topic, partition))
-          case Some(js) => js.asJsonObject("leader_epoch").to[Int]
-        }
-      case None => throw new NoEpochForPartitionException("No epoch, ISR path for partition [%s,%d] is empty"
-        .format(topic, partition))
-    }
-  }
-
-  /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk.
-    * users can provide brokerId in the config , inorder to avoid conflicts between zk generated
-    * seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId.
-    */
-  def getBrokerSequenceId(MaxReservedBrokerId: Int): Int = {
-    getSequenceId(BrokerSequenceIdPath) + MaxReservedBrokerId
-  }
-
-  /**
-   * Gets the in-sync replicas (ISR) for a specific topic and partition
-   */
-  def getInSyncReplicasForPartition(topic: String, partition: Int): Seq[Int] = {
-    val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
-    leaderAndIsrOpt match {
-      case Some(leaderAndIsr) =>
-        Json.parseFull(leaderAndIsr) match {
-          case Some(js) => js.asJsonObject("isr").to[Seq[Int]]
-          case None => Seq.empty[Int]
-        }
-      case None => Seq.empty[Int]
-    }
-  }
-
-  /**
-   * Gets the assigned replicas (AR) for a specific topic and partition
-   */
-  def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = {
-    val seqOpt = for {
-      jsonPartitionMap <- readDataMaybeNull(getTopicPath(topic))._1
-      js <- Json.parseFull(jsonPartitionMap)
-      replicaMap <- js.asJsonObject.get("partitions")
-      seq <- replicaMap.asJsonObject.get(partition.toString)
-    } yield seq.to[Seq[Int]]
-    seqOpt.getOrElse(Seq.empty)
-  }
-
-  def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
-    Json.legacyEncodeAsString(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
-                    "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
-  }
-
-  /**
-   * Get JSON partition to replica map from zookeeper.
-   */
-  def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = {
-    Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> map))
-  }
-
-  /**
-   *  make sure a persistent path exists in ZK. Create the path if not exist.
-   */
-  def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = UseDefaultAcls) {
-    //Consumer path is kept open as different consumers will write under this node.
-    val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) {
-      ZooDefs.Ids.OPEN_ACL_UNSAFE
-    } else if (acls eq UseDefaultAcls) {
-      ZkUtils.defaultAcls(isSecure, path)
-    } else {
-      acls
-    }
-
-    if (!zkClient.exists(path))
-      zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
-  }
-
-  /**
-   *  create the parent path
-   */
-  private def createParentPath(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    val parentDir = path.substring(0, path.lastIndexOf('/'))
-    if (parentDir.length != 0) {
-      zkPath.createPersistent(parentDir, createParents = true, acl)
-    }
-  }
-
-  /**
-   * Create an ephemeral node with the given path and data. Create parents if necessary.
-   */
-  private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL]): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      zkPath.createEphemeral(path, data, acl)
-    } catch {
-      case _: ZkNoNodeException =>
-        createParentPath(path)
-        zkPath.createEphemeral(path, data, acl)
-    }
-  }
-
-  /**
-   * Create an ephemeral node with the given path and data.
-   * Throw NodeExistException if node already exists.
-   */
-  def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      createEphemeralPath(path, data, acl)
-    } catch {
-      case e: ZkNodeExistsException =>
-        // this can happen when there is connection loss; make sure the data is what we intend to write
-        var storedData: String = null
-        try {
-          storedData = readData(path)._1
-        } catch {
-          case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
-        }
-        if (storedData == null || storedData != data) {
-          info(s"conflict in $path data: $data stored data: $storedData")
-          throw e
-        } else {
-          // otherwise, the creation succeeded, return normally
-          info(s"$path exists with value $data during connection loss; this is ok")
-        }
-    }
-  }
-
-  /**
-   * Create a persistent node with the given path and data. Create parents if necessary.
-   */
-  def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      zkPath.createPersistent(path, data, acl)
-    } catch {
-      case _: ZkNoNodeException =>
-        createParentPath(path)
-        zkPath.createPersistent(path, data, acl)
-    }
-  }
-
-  def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): String = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    zkPath.createPersistentSequential(path, data, acl)
-  }
-
-  /**
-   * Update the value of a persistent node with the given path and data.
-   * create parent directory if necessary. Never throw NodeExistException.
-   * Return the updated path zkVersion
-   */
-  def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      zkClient.writeData(path, data)
-    } catch {
-      case _: ZkNoNodeException =>
-        createParentPath(path)
-        try {
-          zkPath.createPersistent(path, data, acl)
-        } catch {
-          case _: ZkNodeExistsException =>
-            zkClient.writeData(path, data)
-        }
-    }
-  }
-
-  /**
-   * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
-   * exist, the current version is not the expected version, etc.) return (false, -1)
-   *
-   * When there is a ConnectionLossException during the conditional update, zkClient will retry the update and may fail
-   * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one).
-   * In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded.
-   */
-  def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int,
-    optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
-    try {
-      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
-      debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
-        .format(path, data, expectVersion, stat.getVersion))
-      (true, stat.getVersion)
-    } catch {
-      case e1: ZkBadVersionException =>
-        optionalChecker match {
-          case Some(checker) => checker(this, path, data)
-          case _ =>
-            debug("Checker method is not passed skipping zkData match")
-            debug("Conditional update of path %s with data %s and expected version %d failed due to %s"
-              .format(path, data,expectVersion, e1.getMessage))
-            (false, -1)
-        }
-      case e2: Exception =>
-        debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
-          expectVersion, e2.getMessage))
-        (false, -1)
-    }
-  }
-
-  /**
-   * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current
-   * version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException
-   */
-  def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = {
-    try {
-      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
-      debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
-        .format(path, data, expectVersion, stat.getVersion))
-      (true, stat.getVersion)
-    } catch {
-      case nne: ZkNoNodeException => throw nne
-      case e: Exception =>
-        error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
-          expectVersion, e.getMessage))
-        (false, -1)
-    }
-  }
-
-  /**
-   * Update the value of a ephemeral node with the given path and data.
-   * create parent directory if necessary. Never throw NodeExistException.
-   */
-  def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      zkClient.writeData(path, data)
-    } catch {
-      case _: ZkNoNodeException =>
-        createParentPath(path)
-        zkPath.createEphemeral(path, data, acl)
-    }
-  }
-
-  def deletePath(path: String): Boolean = {
-    zkClient.delete(path)
-  }
-
-  /**
-    * Conditional delete the persistent path data, return true if it succeeds,
-    * false otherwise (the current version is not the expected version)
-    */
-   def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
-    try {
-      zkClient.delete(path, expectedVersion)
-      true
-    } catch {
-      case _: ZkBadVersionException => false
-    }
-  }
-
-  def deletePathRecursive(path: String) {
-    zkClient.deleteRecursive(path)
-  }
-
-  def subscribeDataChanges(path: String, listener: IZkDataListener): Unit =
-    zkClient.subscribeDataChanges(path, listener)
-
-  def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit =
-    zkClient.unsubscribeDataChanges(path, dataListener)
-
-  def subscribeStateChanges(listener: IZkStateListener): Unit =
-    zkClient.subscribeStateChanges(listener)
-
-  def subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]] =
-    Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala)
-
-  def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit =
-    zkClient.unsubscribeChildChanges(path, childListener)
-
-  def unsubscribeAll(): Unit =
-    zkClient.unsubscribeAll()
-
-  def readData(path: String): (String, Stat) = {
-    val stat: Stat = new Stat()
-    val dataStr: String = zkClient.readData[String](path, stat)
-    (dataStr, stat)
-  }
-
-  def readDataMaybeNull(path: String): (Option[String], Stat) = {
-    val stat = new Stat()
-    val dataAndStat = try {
-                        val dataStr = zkClient.readData[String](path, stat)
-                        (Some(dataStr), stat)
-                      } catch {
-                        case _: ZkNoNodeException =>
-                          (None, stat)
-                      }
-    dataAndStat
-  }
-
-  def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = {
-    val stat = new Stat()
-    try {
-      val data = zkClient.readData[String](path, stat)
-      (Option(data), stat.getVersion)
-    } catch {
-      case _: ZkNoNodeException => (None, stat.getVersion)
-    }
-  }
-
-  def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
-
-  def getChildrenParentMayNotExist(path: String): Seq[String] = {
-    try {
-      zkClient.getChildren(path).asScala
-    } catch {
-      case _: ZkNoNodeException => Nil
-    }
-  }
-
-  /**
-   * Check if the given path exists
-   */
-  def pathExists(path: String): Boolean = {
-    zkClient.exists(path)
-  }
-
-  def isTopicMarkedForDeletion(topic: String): Boolean = {
-    pathExists(getDeleteTopicPath(topic))
-  }
-
-  def getCluster(): Cluster = {
-    val cluster = new Cluster
-    val nodes = getChildrenParentMayNotExist(BrokerIdsPath)
-    for (node <- nodes) {
-      val brokerZKString = readData(BrokerIdsPath + "/" + node)._1
-      cluster.add(parseBrokerJson(node.toInt, brokerZKString))
-    }
-    cluster
-  }
-
-  private def parseBrokerJson(id: Int, jsonString: String): Broker = {
-    BrokerIdZNode.decode(id, jsonString.getBytes(StandardCharsets.UTF_8)).broker
-  }
-
-  def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
-    val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
-    for(topicAndPartition <- topicAndPartitions) {
-      getLeaderIsrAndEpochForPartition(topicAndPartition.topic, topicAndPartition.partition).foreach { leaderIsrAndControllerEpoch =>
-        ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
-      }
-    }
-    ret
-  }
-
-  private[utils] def getLeaderIsrAndEpochForPartition(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
-    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
-    val (leaderAndIsrOpt, stat) = readDataMaybeNull(leaderAndIsrPath)
-    debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
-    leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
-  }
-
-  def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
-    val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
-    topics.foreach { topic =>
-      readDataMaybeNull(getTopicPath(topic))._1.foreach { jsonPartitionMap =>
-        Json.parseFull(jsonPartitionMap).foreach { js =>
-          js.asJsonObject.get("partitions").foreach { partitionsJs =>
-            partitionsJs.asJsonObject.iterator.foreach { case (partition, replicas) =>
-              ret.put(TopicAndPartition(topic, partition.toInt), replicas.to[Seq[Int]])
-              debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
-            }
-          }
-        }
-      }
-    }
-    ret
-  }
-
-  def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
-    val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
-    topics.foreach { topic =>
-      val partitionMapOpt = for {
-        jsonPartitionMap <- readDataMaybeNull(getTopicPath(topic))._1
-        js <- Json.parseFull(jsonPartitionMap)
-        replicaMap <- js.asJsonObject.get("partitions")
-      } yield replicaMap.asJsonObject.iterator.map { case (k, v) => (k.toInt, v.to[Seq[Int]]) }.toMap
-      val partitionMap = partitionMapOpt.getOrElse(Map.empty)
-      debug("Partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
-      ret += (topic -> partitionMap)
-    }
-    ret
-  }
-
-  def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
-    getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
-      val topic = topicAndPartitionMap._1
-      val partitionMap = topicAndPartitionMap._2
-      debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
-      topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
-    }
-  }
-
-  def getTopicPartitionCount(topic: String): Option[Int] = {
-    val topicData = getPartitionAssignmentForTopics(Seq(topic))
-    if (topicData(topic).nonEmpty)
-      Some(topicData(topic).size)
-    else
-      None
-  }
-
-  def getPartitionsBeingReassigned(): Map[TopicAndPartition, ReassignedPartitionsContext] = {
-    // read the partitions and their new replica list
-    val jsonPartitionMapOpt = readDataMaybeNull(ReassignPartitionsPath)._1
-    jsonPartitionMapOpt match {
-      case Some(jsonPartitionMap) =>
-        val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
-        reassignedPartitions.map { case (tp, newReplicas) =>
-          tp -> new ReassignedPartitionsContext(newReplicas, null)
-        }
-      case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
-    }
-  }
-
-  def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
-    val zkPath = ZkUtils.ReassignPartitionsPath
-    partitionsToBeReassigned.size match {
-      case 0 => // need to delete the /admin/reassign_partitions path
-        deletePath(zkPath)
-        info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
-      case _ =>
-        val jsonData = formatAsReassignmentJson(partitionsToBeReassigned)
-        try {
-          updatePersistentPath(zkPath, jsonData)
-          debug("Updated partition reassignment path with %s".format(jsonData))
-        } catch {
-          case _: ZkNoNodeException =>
-            createPersistentPath(zkPath, jsonData)
-            debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
-          case e2: Throwable => throw new AdminOperationException(e2.toString)
-        }
-    }
-  }
-
-  def getPartitionsUndergoingPreferredReplicaElection(): Set[TopicAndPartition] = {
-    // read the partitions and their new replica list
-    val jsonPartitionListOpt = readDataMaybeNull(PreferredReplicaLeaderElectionPath)._1
-    jsonPartitionListOpt match {
-      case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList).map(tp => new TopicAndPartition(tp))
-      case None => Set.empty[TopicAndPartition]
-    }
-  }
-
-  def deletePartition(brokerId: Int, topic: String) {
-    val brokerIdPath = BrokerIdsPath + "/" + brokerId
-    zkClient.delete(brokerIdPath)
-    val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId
-    zkClient.delete(brokerPartTopicPath)
-  }
-
-  /**
-   * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
-   * or throws an exception if the broker dies before the query to zookeeper finishes
-   *
-   * @param brokerId The broker id
-   * @return An optional Broker object encapsulating the broker metadata
-   */
-  def getBrokerInfo(brokerId: Int): Option[Broker] = {
-    readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match {
-      case Some(brokerInfo) => Some(parseBrokerJson(brokerId, brokerInfo))
-      case None => None
-    }
-  }
-
-  /**
-    * This API produces a sequence number by creating / updating given path in zookeeper
-    * It uses the stat returned by the zookeeper and return the version. Every time
-    * client updates the path stat.version gets incremented. Starting value of sequence number is 1.
-    */
-  def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = {
-    val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
-    try {
-      writeToZk
-    } catch {
-      case _: ZkNoNodeException =>
-        makeSurePersistentPathExists(path, acl)
-        writeToZk
-    }
-  }
-
-  def getAllTopics(): Seq[String] = {
-    val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
-    if(topics == null)
-      Seq.empty[String]
-    else
-      topics
-  }
-
-  /**
-   * Returns all the entities whose configs have been overridden.
-   */
-  def getAllEntitiesWithConfig(entityType: String): Seq[String] = {
-    val entities = getChildrenParentMayNotExist(getEntityConfigRootPath(entityType))
-    if(entities == null)
-      Seq.empty[String]
-    else
-      entities
-  }
-
-  def getAllPartitions(): Set[TopicAndPartition] = {
-    val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
-    if (topics == null) Set.empty[TopicAndPartition]
-    else {
-      topics.flatMap { topic =>
-        // The partitions path may not exist if the topic is in the process of being deleted
-        getChildrenParentMayNotExist(getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic, _))
-      }.toSet
-    }
-  }
-
-  def close() {
-    zkClient.close()
-  }
-}
-
-private object ZKStringSerializer extends ZkSerializer {
-
-  @throws(classOf[ZkMarshallingError])
-  def serialize(data : Object): Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
-
-  @throws(classOf[ZkMarshallingError])
-  def deserialize(bytes : Array[Byte]): Object = {
-    if (bytes == null)
-      null
-    else
-      new String(bytes, "UTF-8")
-  }
-}
-
-object ZKConfig {
-  val ZkConnectProp = "zookeeper.connect"
-  val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
-  val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
-  val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
-}
-
-class ZKConfig(props: VerifiableProperties) {
-  import ZKConfig._
-
-  /** ZK host string */
-  val zkConnect = props.getString(ZkConnectProp)
-
-  /** zookeeper session timeout */
-  val zkSessionTimeoutMs = props.getInt(ZkSessionTimeoutMsProp, 6000)
-
-  /** the max time that the client waits to establish a connection to zookeeper */
-  val zkConnectionTimeoutMs = props.getInt(ZkConnectionTimeoutMsProp, zkSessionTimeoutMs)
-
-  /** how far a ZK follower can be behind a ZK leader */
-  val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
-}
-
-class ZkPath(zkClient: ZkClient) {
-
-  @volatile private var isNamespacePresent: Boolean = false
-
-  def checkNamespace() {
-    if (isNamespacePresent)
-      return
-
-    if (!zkClient.exists("/")) {
-      throw new ConfigException("Zookeeper namespace does not exist")
-    }
-    isNamespacePresent = true
-  }
-
-  def resetNamespaceCheckedState() {
-    isNamespacePresent = false
-  }
-
-  def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) {
-    checkNamespace()
-    zkClient.createPersistent(path, data, acls)
-  }
-
-  def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) {
-    checkNamespace()
-    zkClient.createPersistent(path, createParents, acls)
-  }
-
-  def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) {
-    checkNamespace()
-    zkClient.createEphemeral(path, data, acls)
-  }
-
-  def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = {
-    checkNamespace()
-    zkClient.createPersistentSequential(path, data, acls)
-  }
-}
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 20c28e7..4ec8665 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -18,6 +18,7 @@
 package kafka
 
 import java.io.{File, PrintWriter}
+import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, StandardOpenOption}
 import javax.imageio.ImageIO
 
@@ -25,7 +26,7 @@ import kafka.admin.ReassignPartitionsCommand
 import kafka.admin.ReassignPartitionsCommand.Throttle
 import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
 import kafka.utils.TestUtils._
-import kafka.utils.{Exit, Logging, TestUtils, ZkUtils}
+import kafka.utils.{Exit, Logging, TestUtils}
 import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
@@ -138,7 +139,8 @@ object ReplicationQuotasTestRig {
       val newAssignment = ReassignPartitionsCommand.generateAssignment(zkClient, brokers, json(topicName), true)._1
 
       val start = System.currentTimeMillis()
-      ReassignPartitionsCommand.executeAssignment(zkClient, None, ZkUtils.getReassignmentJson(newAssignment), Throttle(config.throttle))
+      ReassignPartitionsCommand.executeAssignment(zkClient, None,
+        new String(ReassignPartitionsZNode.encode(newAssignment), StandardCharsets.UTF_8), Throttle(config.throttle))
 
       //Await completion
       waitForReassignmentToComplete()
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
deleted file mode 100755
index fa4206b..0000000
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
-import org.apache.kafka.common.metrics.Quota
-import org.easymock.EasyMock
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-import java.util.Properties
-
-import kafka.utils._
-import kafka.zk.{ConfigEntityZNode, ZooKeeperTestHarness}
-import kafka.utils.{Logging, TestUtils, ZkUtils}
-import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
-
-import scala.collection.{Map, immutable}
-import org.apache.kafka.common.security.JaasUtils
-import org.scalatest.Assertions.intercept
-
-import scala.collection.JavaConverters._
-
-@deprecated("This test has been deprecated and will be removed in a future release.", "1.1.0")
-class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
-
-  var servers: Seq[KafkaServer] = Seq()
-  var zkUtils: ZkUtils = null
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-  }
-
-  @After
-  override def tearDown() {
-    if (zkUtils != null)
-     CoreUtils.swallow(zkUtils.close(), this)
-    TestUtils.shutdownServers(servers)
-    super.tearDown()
-  }
-
-  @Test
-  def testManualReplicaAssignment() {
-    val brokers = List(0, 1, 2, 3, 4)
-    TestUtils.createBrokersInZk(zkClient, brokers)
-
-    // duplicate brokers
-    intercept[InvalidReplicaAssignmentException] {
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,0)))
-    }
-
-    // inconsistent replication factor
-    intercept[InvalidReplicaAssignmentException] {
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,1), 1->Seq(0)))
-    }
-
-    // good assignment
-    val assignment = Map(0 -> List(0, 1, 2),
-                         1 -> List(1, 2, 3))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", assignment)
-    val found = zkUtils.getPartitionAssignmentForTopics(Seq("test"))
-    assertEquals(assignment, found("test"))
-  }
-
-  @Test
-  def testTopicCreationInZK() {
-    val expectedReplicaAssignment = Map(
-      0  -> List(0, 1, 2),
-      1  -> List(1, 2, 3),
-      2  -> List(2, 3, 4),
-      3  -> List(3, 4, 0),
-      4  -> List(4, 0, 1),
-      5  -> List(0, 2, 3),
-      6  -> List(1, 3, 4),
-      7  -> List(2, 4, 0),
-      8  -> List(3, 0, 1),
-      9  -> List(4, 1, 2),
-      10 -> List(1, 2, 3),
-      11 -> List(1, 3, 4)
-    )
-    val leaderForPartitionMap = immutable.Map(
-      0 -> 0,
-      1 -> 1,
-      2 -> 2,
-      3 -> 3,
-      4 -> 4,
-      5 -> 0,
-      6 -> 1,
-      7 -> 2,
-      8 -> 3,
-      9 -> 4,
-      10 -> 1,
-      11 -> 1
-    )
-    val topic = "test"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
-    // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
-    // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => p -> zkUtils.getReplicasForPartition(topic, p)).toMap
-    assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
-    for(i <- 0 until actualReplicaList.size)
-      assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
-
-    intercept[TopicExistsException] {
-      // shouldn't be able to create a topic that already exists
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
-    }
-  }
-
-  @Test
-  def testTopicCreationWithCollision() {
-    val topic = "test.topic"
-    val collidingTopic = "test_topic"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
-    // create the topic
-    AdminUtils.createTopic(zkUtils, topic, 3, 1)
-
-    intercept[InvalidTopicException] {
-      // shouldn't be able to create a topic that collides
-      AdminUtils.createTopic(zkUtils, collidingTopic, 3, 1)
-    }
-  }
-
-  @Test
-  def testConcurrentTopicCreation() {
-    val topic = "test.topic"
-
-    // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
-    val zkMock: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkMock.pathExists(s"/brokers/topics/$topic")).andReturn(false)
-    EasyMock.expect(zkMock.getAllTopics).andReturn(Seq("some.topic", topic, "some.other.topic"))
-    EasyMock.replay(zkMock)
-
-    intercept[TopicExistsException] {
-      AdminUtils.validateCreateOrUpdateTopic(zkMock, topic, Map.empty, new Properties, update = false)
-    }
-  }
-
-  /**
-   * This test simulates a client config change in ZK whose notification has been purged.
-   * Basically, it asserts that notifications are bootstrapped from ZK
-   */
-  @Test
-  def testBootstrapClientIdConfig() {
-    val clientId = "my-client"
-    val props = new Properties()
-    props.setProperty("producer_byte_rate", "1000")
-    props.setProperty("consumer_byte_rate", "2000")
-
-    // Write config without notification to ZK.
-    val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000")
-    val map = Map("version" -> 1, "config" -> configMap.asJava)
-    zkUtils.updatePersistentPath(ConfigEntityZNode.path(ConfigType.Client, clientId), Json.encodeAsString(map.asJava))
-
-    val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client)
-    assertEquals("Must have 1 overridden client config", 1, configInZk.size)
-    assertEquals(props, configInZk(clientId))
-
-    // Test that the existing clientId overrides are read
-    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
-    servers = Seq(server)
-    assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId))
-    assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId))
-  }
-
-  @Test
-  def testGetBrokerMetadatas() {
-    // broker 4 has no rack information
-    val brokerList = 0 to 5
-    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 5 -> "rack3")
-    val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet))
-    TestUtils.createBrokersInZk(brokerMetadatas, zkClient)
-
-    val processedMetadatas1 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Disabled)
-    assertEquals(brokerList, processedMetadatas1.map(_.id))
-    assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack))
-
-    val processedMetadatas2 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Safe)
-    assertEquals(brokerList, processedMetadatas2.map(_.id))
-    assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack))
-
-    intercept[AdminOperationException] {
-      AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
-    }
-
-    val partialList = List(0, 1, 2, 3, 5)
-    val processedMetadatas3 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced, Some(partialList))
-    assertEquals(partialList, processedMetadatas3.map(_.id))
-    assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack))
-
-    val numPartitions = 3
-    AdminUtils.createTopic(zkUtils, "foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe)
-    val assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo"))
-    assertEquals(numPartitions, assignment.size)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala b/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
deleted file mode 100644
index 7cbad05..0000000
--- a/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.admin
-
-import java.util.Properties
-import kafka.utils.ZkUtils
-
-@deprecated("This class is deprecated since AdminUtilities will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-class TestAdminUtils extends AdminUtilities {
-  override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties): Unit = {}
-  override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {new Properties}
-  override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties): Unit = {}
-  override def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties): Unit = {}
-  override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {}
-}
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
deleted file mode 100755
index c0c3c6b..0000000
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.security.JaasUtils
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-
-@deprecated("Deprecated given that ZkUtils is deprecated", since = "2.0.0")
-class ZkUtilsTest extends ZooKeeperTestHarness {
-
-  val path = "/path"
-  var zkUtils: ZkUtils = _
-
-  @Before
-  override def setUp() {
-    super.setUp
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-  }
-
-  @After
-  override def tearDown() {
-    if (zkUtils != null)
-     CoreUtils.swallow(zkUtils.close(), this)
-    super.tearDown
-  }
-
-  @Test
-  def testSuccessfulConditionalDeletePath() {
-    // Given an existing path
-    zkUtils.createPersistentPath(path)
-    val (_, statAfterCreation) = zkUtils.readData(path)
-
-    // Deletion is successful when the version number matches
-    assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion))
-    val (optionalData, _) = zkUtils.readDataMaybeNull(path)
-    assertTrue("Node should be deleted", optionalData.isEmpty)
-
-    // Deletion is successful when the node does not exist too
-    assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, 0))
-  }
-
-  // Verify behaviour of ZkUtils.createSequentialPersistentPath since PIDManager relies on it
-  @Test
-  def testPersistentSequentialPath() {
-    // Given an existing path
-    zkUtils.createPersistentPath(path)
-
-    var result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
-
-    assertEquals("/path/sequence_0000000000", result)
-
-    result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
-
-    assertEquals("/path/sequence_0000000001", result)
-  }
-
-  @Test
-  def testAbortedConditionalDeletePath() {
-    // Given an existing path that gets updated
-    zkUtils.createPersistentPath(path)
-    val (_, statAfterCreation) = zkUtils.readData(path)
-    zkUtils.updatePersistentPath(path, "data")
-
-    // Deletion is aborted when the version number does not match
-    assertFalse("Deletion should be aborted", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion))
-    val (optionalData, _) = zkUtils.readDataMaybeNull(path)
-    assertTrue("Node should still be there", optionalData.isDefined)
-  }
-
-  @Test
-  def testClusterIdentifierJsonParsing() {
-    val clusterId = "test"
-    assertEquals(zkUtils.ClusterId.fromJson(zkUtils.ClusterId.toJson(clusterId)), clusterId)
-  }
-
-  @Test
-  def testGetAllPartitionsTopicWithoutPartitions() {
-    val topic = "testtopic"
-    // Create a regular topic and a topic without any partitions
-    zkUtils.createPersistentPath(ZkUtils.getTopicPartitionPath(topic, 0))
-    zkUtils.createPersistentPath(ZkUtils.getTopicPath("nopartitions"))
-
-    assertEquals(Set(TopicAndPartition(topic, 0)), zkUtils.getAllPartitions())
-  }
-
-  @Test
-  def testGetLeaderIsrAndEpochForPartition() {
-    val topic = "my-topic-test"
-    val partition = 0
-    val leader = 1
-    val leaderEpoch = 1
-    val controllerEpoch = 1
-    val isr = List(1, 2)
-    val topicPath = s"/brokers/topics/$topic/partitions/$partition/state"
-    val topicData = Json.legacyEncodeAsString(Map("controller_epoch" -> controllerEpoch, "leader" -> leader,
-      "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr))
-    zkUtils.createPersistentPath(topicPath, topicData)
-
-    val leaderIsrAndControllerEpoch = zkUtils.getLeaderIsrAndEpochForPartition(topic, partition)
-    val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, 0),
-      controllerEpoch)
-    assertEquals(topicDataLeaderIsrAndControllerEpoch, leaderIsrAndControllerEpoch.get)
-    assertEquals(None, zkUtils.getLeaderIsrAndEpochForPartition(topic, partition + 1))
-  }
-
-  @Test
-  def testGetSequenceIdMethod() {
-    val path = "/test/seqid"
-    (1 to 10).foreach { seqid =>
-      assertEquals(seqid, zkUtils.getSequenceId(path))
-    }
-  }
-}