You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/06/23 03:45:05 UTC

[kafka] branch trunk updated: KAFKA-8545: Remove legacy ZkUtils (#6948)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc27cbe  KAFKA-8545: Remove legacy ZkUtils (#6948)
fc27cbe is described below

commit fc27cbe4159eaf9195017bc0fa929b0b84216d61
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sat Jun 22 20:44:40 2019 -0700

    KAFKA-8545: Remove legacy ZkUtils (#6948)
    
    `ZkUtils` is not used by the broker, has been deprecated since
    2.0.0 and it was never intended as a public API. We should
    remove it along with `AdminUtils` methods that rely on it.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 core/src/main/scala/kafka/admin/AdminUtils.scala   | 431 +---------
 .../main/scala/kafka/common/KafkaException.scala   |   5 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      | 890 ---------------------
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   8 +-
 .../kafka/api/SaslPlainPlaintextConsumerTest.scala |   2 +-
 .../other/kafka/ReplicationQuotasTestRig.scala     |   6 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    | 213 -----
 .../scala/unit/kafka/admin/TestAdminUtils.scala    |  29 -
 .../ZkNodeChangeNotificationListenerTest.scala     |   4 +-
 .../controller/PartitionStateMachineTest.scala     |   4 +-
 .../test/scala/unit/kafka/utils/ZkUtilsTest.scala  | 133 ---
 11 files changed, 18 insertions(+), 1707 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index cd47969..c3748cf 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -17,54 +17,16 @@
 
 package kafka.admin
 
-import kafka.log.LogConfig
-import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils._
-import kafka.utils.ZkUtils._
 import java.util.Random
-import java.util.Properties
 
-import kafka.common.TopicAlreadyMarkedForDeletionException
-import org.apache.kafka.common.errors._
+import kafka.utils.Logging
+import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicationFactorException}
 
-import collection.{Map, Set, mutable, _}
-import scala.collection.JavaConverters._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.apache.kafka.common.internals.Topic
+import collection.{Map, mutable, _}
 
-@deprecated("This class is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-trait AdminUtilities {
-  def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
-  def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
-  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties)
-  def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties)
-
-  def changeConfigs(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties): Unit = {
-
-    def parseBroker(broker: String): Int = {
-      try broker.toInt
-      catch {
-        case _: NumberFormatException =>
-          throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
-      }
-    }
-
-    entityType match {
-      case ConfigType.Topic => changeTopicConfig(zkUtils, entityName, configs)
-      case ConfigType.Client => changeClientIdConfig(zkUtils, entityName, configs)
-      case ConfigType.User => changeUserOrUserClientIdConfig(zkUtils, entityName, configs)
-      case ConfigType.Broker => changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs)
-      case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
-    }
-  }
-
-  def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties
-}
-
-object AdminUtils extends Logging with AdminUtilities {
+object AdminUtils extends Logging {
   val rand = new Random
   val AdminClientId = "__admin_client"
-  val EntityConfigChangeZnodePrefix = "config_change_"
 
   /**
    * There are 3 goals of replica assignment:
@@ -256,391 +218,6 @@ object AdminUtils extends Logging with AdminUtilities {
       .groupBy { case (rack, _) => rack }
       .map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) }
   }
- /**
-  * Add partitions to existing topic with optional replica assignment
-  *
-  * @param zkUtils Zookeeper utilities
-  * @param topic Topic for adding partitions to
-  * @param existingAssignment A map from partition id to its assigned replicas
-  * @param allBrokers All brokers in the cluster
-  * @param numPartitions Number of partitions to be set
-  * @param replicaAssignment Manual replica assignment, or none
-  * @param validateOnly If true, validate the parameters without actually adding the partitions
-  * @return the updated replica assignment
-  */
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def addPartitions(zkUtils: ZkUtils,
-                    topic: String,
-                    existingAssignment: Map[Int, Seq[Int]],
-                    allBrokers: Seq[BrokerMetadata],
-                    numPartitions: Int = 1,
-                    replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
-                    validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
-    val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
-      throw new AdminOperationException(
-        s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
-          s"Assignment: $existingAssignment"))
-
-    val partitionsToAdd = numPartitions - existingAssignment.size
-    if (partitionsToAdd <= 0)
-      throw new InvalidPartitionsException(
-        s"The number of partitions for a topic can only be increased. " +
-          s"Topic $topic currently has ${existingAssignment.size} partitions, " +
-          s"$numPartitions would not be an increase.")
-
-    replicaAssignment.foreach { proposedReplicaAssignment =>
-      validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0,
-        allBrokers.map(_.id).toSet)
-    }
-
-    val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
-      val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
-      AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
-        startIndex, existingAssignment.size)
-    }
-    val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions
-    if (!validateOnly) {
-      info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
-        s"$proposedAssignmentForNewPartitions.")
-      // add the combined new list
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, proposedAssignment, update = true)
-    }
-    proposedAssignment
-
-  }
-
-  /**
-    * Parse a replica assignment string of the form:
-    * {{{
-    * broker_id_for_part1_replica1:broker_id_for_part1_replica2,
-    * broker_id_for_part2_replica1:broker_id_for_part2_replica2,
-    * ...
-    * }}}
-    */
-  def parseReplicaAssignment(replicaAssignmentsString: String, startPartitionId: Int): Map[Int, Seq[Int]] = {
-    val assignmentStrings = replicaAssignmentsString.split(",")
-    val assignmentMap = mutable.Map[Int, Seq[Int]]()
-    var partitionId = startPartitionId
-    for (assignmentString <- assignmentStrings) {
-      val brokerIds = assignmentString.split(":").map(_.trim.toInt).toSeq
-      assignmentMap.put(partitionId, brokerIds)
-      partitionId = partitionId + 1
-    }
-    assignmentMap
-  }
-
-  private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
-                                        existingAssignmentPartition0: Seq[Int],
-                                        availableBrokerIds: Set[Int]): Unit = {
-
-    replicaAssignment.foreach { case (partitionId, replicas) =>
-      if (replicas.isEmpty)
-        throw new InvalidReplicaAssignmentException(
-          s"Cannot have replication factor of 0 for partition id $partitionId.")
-      if (replicas.size != replicas.toSet.size)
-        throw new InvalidReplicaAssignmentException(
-          s"Duplicate brokers not allowed in replica assignment: " +
-            s"${replicas.mkString(", ")} for partition id $partitionId.")
-      if (!replicas.toSet.subsetOf(availableBrokerIds))
-        throw new BrokerNotAvailableException(
-          s"Some brokers specified for partition id $partitionId are not available. " +
-            s"Specified brokers: ${replicas.mkString(", ")}, " +
-            s"available brokers: ${availableBrokerIds.mkString(", ")}.")
-      partitionId -> replicas.size
-    }
-    val badRepFactors = replicaAssignment.collect {
-      case (partition, replicas) if replicas.size != existingAssignmentPartition0.size => partition -> replicas.size
-    }
-    if (badRepFactors.nonEmpty) {
-      val sortedBadRepFactors = badRepFactors.toSeq.sortBy { case (partitionId, _) => partitionId }
-      val partitions = sortedBadRepFactors.map { case (partitionId, _) => partitionId }
-      val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
-      throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " +
-        s"partition 0 has ${existingAssignmentPartition0.size} while partitions [${partitions.mkString(", ")}] have " +
-        s"replication factors [${repFactors.mkString(", ")}], respectively.")
-    }
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def deleteTopic(zkUtils: ZkUtils, topic: String) {
-    if (topicExists(zkUtils, topic)) {
-      try {
-        zkUtils.createPersistentPath(getDeleteTopicPath(topic))
-      } catch {
-        case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
-          "topic %s is already marked for deletion".format(topic))
-        case e2: Throwable => throw new AdminOperationException(e2)
-      }
-    } else {
-      throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
-    }
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
-    zkUtils.pathExists(getTopicPath(topic))
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
-                         brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
-    val allBrokers = zkUtils.getAllBrokersInCluster()
-    val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
-    val brokersWithRack = brokers.filter(_.rack.nonEmpty)
-    if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
-      throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
-        " to make replica assignment without rack information.")
-    }
-    val brokerMetadatas = rackAwareMode match {
-      case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
-      case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
-        brokers.map(broker => BrokerMetadata(broker.id, None))
-      case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
-    }
-    brokerMetadatas.sortBy(_.id)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def createTopic(zkUtils: ZkUtils,
-                  topic: String,
-                  partitions: Int,
-                  replicationFactor: Int,
-                  topicConfig: Properties = new Properties,
-                  rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
-    val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
-    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def validateCreateOrUpdateTopic(zkUtils: ZkUtils,
-                                  topic: String,
-                                  partitionReplicaAssignment: Map[Int, Seq[Int]],
-                                  config: Properties,
-                                  update: Boolean): Unit = {
-    // validate arguments
-    Topic.validate(topic)
-
-    if (!update) {
-      if (topicExists(zkUtils, topic))
-        throw new TopicExistsException(s"Topic '$topic' already exists.")
-      else if (Topic.hasCollisionChars(topic)) {
-        val allTopics = zkUtils.getAllTopics()
-        // check again in case the topic was created in the meantime, otherwise the
-        // topic could potentially collide with itself
-        if (allTopics.contains(topic))
-          throw new TopicExistsException(s"Topic '$topic' already exists.")
-        val collidingTopics = allTopics.filter(Topic.hasCollision(topic, _))
-        if (collidingTopics.nonEmpty) {
-          throw new InvalidTopicException(s"Topic '$topic' collides with existing topics: ${collidingTopics.mkString(", ")}")
-        }
-      }
-    }
-
-    if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1)
-      throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas")
-
-    partitionReplicaAssignment.values.foreach(reps =>
-      if (reps.size != reps.toSet.size)
-        throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment)
-    )
-
-
-    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
-    if (!update)
-      LogConfig.validate(config)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
-                                                     topic: String,
-                                                     partitionReplicaAssignment: Map[Int, Seq[Int]],
-                                                     config: Properties = new Properties,
-                                                     update: Boolean = false) {
-    validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update)
-
-    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
-    if (!update) {
-      // write out the config if there is any, this isn't transactional with the partition assignments
-      writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
-    }
-
-    // create the partition assignment
-    writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
-    try {
-      val zkPath = getTopicPath(topic)
-      val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))
-
-      if (!update) {
-        info(s"Topic creation $jsonPartitionData")
-        zkUtils.createPersistentPath(zkPath, jsonPartitionData)
-      } else {
-        info(s"Topic update $jsonPartitionData")
-        zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
-      }
-      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
-    } catch {
-      case _: ZkNodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
-      case e2: Throwable => throw new AdminOperationException(e2.toString)
-    }
-  }
-
-  /**
-   * Update the config for a client and create a change notification so the change will propagate to other brokers.
-   * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
-   * and <user> configs are not specified.
-   *
-   * @param zkUtils Zookeeper utilities used to write the config to ZK
-   * @param sanitizedClientId: The sanitized clientId for which configs are being changed
-   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
-   *                 existing configs need to be deleted, it should be done prior to invoking this API
-   *
-   */
-   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) {
-    DynamicConfig.Client.validate(configs)
-    changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs)
-  }
-
-  /**
-   * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
-   * User and/or clientId components of the path may be <default>, indicating that the configuration is the default
-   * value to be applied if a more specific override is not configured.
-   *
-   * @param zkUtils Zookeeper utilities used to write the config to ZK
-   * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
-   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
-   *                 existing configs need to be deleted, it should be done prior to invoking this API
-   *
-   */
-   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
-    if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
-      DynamicConfig.Client.validate(configs)
-    else
-      DynamicConfig.User.validate(configs)
-    changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def validateTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {
-    Topic.validate(topic)
-    if (!topicExists(zkUtils, topic))
-      throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
-    // remove the topic overrides
-    LogConfig.validate(configs)
-  }
-
-  /**
-   * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
-   *
-   * @param zkUtils Zookeeper utilities used to write the config to ZK
-   * @param topic: The topic for which configs are being changed
-   * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
-   *                 existing configs need to be deleted, it should be done prior to invoking this API
-   *
-   */
-   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
-    validateTopicConfig(zkUtils, topic, configs)
-    changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
-  }
-
-  /**
-    * Override the broker config on some set of brokers. These overrides will be persisted between sessions, and will
-    * override any defaults entered in the broker's config files
-    *
-    * @param zkUtils: Zookeeper utilities used to write the config to ZK
-    * @param brokers: The list of brokers to apply config changes to
-    * @param configs: The config to change, as properties
-    */
-   @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit = {
-    DynamicConfig.Broker.validate(configs)
-    brokers.foreach { broker =>
-      changeEntityConfig(zkUtils, ConfigType.Broker, broker.toString, configs)
-    }
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
-    val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
-    val entityConfigPath = getEntityConfigPath(rootEntityType, fullSanitizedEntityName)
-    // write the new config--may not exist if there were previously no overrides
-    writeEntityConfig(zkUtils, entityConfigPath, configs)
-
-    // create the change notification
-    val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
-    val content = Json.legacyEncodeAsString(getConfigChangeZnodeData(sanitizedEntityPath))
-    zkUtils.createSequentialPersistentPath(seqNode, content)
-  }
-
-  def getConfigChangeZnodeData(sanitizedEntityPath: String): Map[String, Any] = {
-    Map("version" -> 2, "entity_path" -> sanitizedEntityPath)
-  }
-
-  /**
-   * Write out the entity config to zk, if there is any
-   */
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) {
-    val map = Map("version" -> 1, "config" -> config.asScala)
-    zkUtils.updatePersistentPath(entityPath, Json.legacyEncodeAsString(map))
-  }
-
-  /**
-   * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
-   * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
-   */
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = {
-    val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName)
-    // readDataMaybeNull returns Some(null) if the path exists, but there is no data
-    val str = zkUtils.readDataMaybeNull(entityConfigPath)._1.orNull
-    val props = new Properties()
-    if (str != null) {
-      Json.parseFull(str).foreach { jsValue =>
-        val jsObject = jsValue.asJsonObjectOption.getOrElse {
-          throw new IllegalArgumentException(s"Unexpected value in config: $str, entity_config_path: $entityConfigPath")
-        }
-        require(jsObject("version").to[Int] == 1)
-        val config = jsObject.get("config").flatMap(_.asJsonObjectOption).getOrElse {
-          throw new IllegalArgumentException(s"Invalid $entityConfigPath config: $str")
-        }
-        config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) }
-      }
-    }
-    props
-  }
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
-    zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
-    zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
-
-  @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-  def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
-    def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
-      val root = rootPath match {
-        case Some(path) => rootEntityType + '/' + path
-        case None => rootEntityType
-      }
-      val entityNames = zkUtils.getAllEntitiesWithConfig(root)
-      rootPath match {
-        case Some(path) => entityNames.map(entityName => path + '/' + entityName)
-        case None => entityNames
-      }
-    }
-    entityPaths(zkUtils, None)
-      .flatMap(entity => entityPaths(zkUtils, Some(entity + '/' + childEntityType)))
-      .map(entityPath => (entityPath, fetchEntityConfig(zkUtils, rootEntityType, entityPath))).toMap
-  }
 
   private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
     val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
diff --git a/core/src/main/scala/kafka/common/KafkaException.scala b/core/src/main/scala/kafka/common/KafkaException.scala
index 61b3ba3..9c34dd9 100644
--- a/core/src/main/scala/kafka/common/KafkaException.scala
+++ b/core/src/main/scala/kafka/common/KafkaException.scala
@@ -19,9 +19,8 @@ package kafka.common
 /**
  * Usage of this class is discouraged. Use org.apache.kafka.common.KafkaException instead.
  *
- * This class will be removed once ZkUtils and the kafka.security.auth classes are removed.
- * The former is internal, but widely used, so we are leaving it in the codebase for now.
-*/
+ * This class will be removed once kafka.security.auth classes are removed.
+ */
 class KafkaException(message: String, t: Throwable) extends RuntimeException(message, t) {
   def this(message: String) = this(message, null)
   def this(t: Throwable) = this("", t)
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
deleted file mode 100644
index dd85090..0000000
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ /dev/null
@@ -1,890 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.nio.charset.StandardCharsets
-
-import kafka.admin._
-import kafka.api.LeaderAndIsr
-import kafka.cluster._
-import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
-import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
-import kafka.zk.{BrokerIdZNode, ReassignPartitionsZNode, ZkData}
-import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
-import org.I0Itec.zkclient.serialize.ZkSerializer
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
-import org.apache.kafka.common.config.ConfigException
-import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.ZooDefs
-
-import scala.collection._
-import scala.collection.JavaConverters._
-import org.apache.kafka.common.TopicPartition
-
-@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " +
-  "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0")
-object ZkUtils {
-
-  private val UseDefaultAcls = new java.util.ArrayList[ACL]
-
-  // Important: it is necessary to add any new top level Zookeeper path here
-  val AdminPath = "/admin"
-  val BrokersPath = "/brokers"
-  val ClusterPath = "/cluster"
-  val ConfigPath = "/config"
-  val ControllerPath = "/controller"
-  val ControllerEpochPath = "/controller_epoch"
-  val IsrChangeNotificationPath = "/isr_change_notification"
-  val LogDirEventNotificationPath = "/log_dir_event_notification"
-  val KafkaAclPath = "/kafka-acl"
-  val KafkaAclChangesPath = "/kafka-acl-changes"
-
-  val ConsumersPath = "/consumers"
-  val ClusterIdPath = s"$ClusterPath/id"
-  val BrokerIdsPath = s"$BrokersPath/ids"
-  val BrokerTopicsPath = s"$BrokersPath/topics"
-  val ReassignPartitionsPath = s"$AdminPath/reassign_partitions"
-  val DeleteTopicsPath = s"$AdminPath/delete_topics"
-  val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election"
-  val BrokerSequenceIdPath = s"$BrokersPath/seqid"
-  val ConfigChangesPath = s"$ConfigPath/changes"
-  val ConfigUsersPath = s"$ConfigPath/users"
-  val ConfigBrokersPath = s"$ConfigPath/brokers"
-  val ProducerIdBlockPath = "/latest_producer_id_block"
-
-  val SecureZkRootPaths = ZkData.SecureRootPaths
-
-  val SensitiveZkRootPaths = ZkData.SensitiveRootPaths
-
-  def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
-    val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
-    new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled)
-  }
-
-  /*
-   * Used in tests
-   */
-  def apply(zkClient: ZkClient, isZkSecurityEnabled: Boolean): ZkUtils = {
-    new ZkUtils(zkClient, null, isZkSecurityEnabled)
-  }
-
-  def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = {
-    val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer)
-    zkClient
-  }
-
-  def createZkClientAndConnection(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): (ZkClient, ZkConnection) = {
-    val zkConnection = new ZkConnection(zkUrl, sessionTimeout)
-    val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer)
-    (zkClient, zkConnection)
-  }
-
-  def sensitivePath(path: String): Boolean = ZkData.sensitivePath(path)
-
-  def defaultAcls(isSecure: Boolean, path: String): java.util.List[ACL] = ZkData.defaultAcls(isSecure, path).asJava
-
-  def maybeDeletePath(zkUrl: String, dir: String) {
-    try {
-      val zk = createZkClient(zkUrl, 30*1000, 30*1000)
-      zk.deleteRecursive(dir)
-      zk.close()
-    } catch {
-      case _: Throwable => // swallow
-    }
-  }
-
-  /*
-   * Get calls that only depend on static paths
-   */
-  def getTopicPath(topic: String): String = {
-    ZkUtils.BrokerTopicsPath + "/" + topic
-  }
-
-  def getTopicPartitionsPath(topic: String): String = {
-    getTopicPath(topic) + "/partitions"
-  }
-
-  def getTopicPartitionPath(topic: String, partitionId: Int): String =
-    getTopicPartitionsPath(topic) + "/" + partitionId
-
-  def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
-    getTopicPartitionPath(topic, partitionId) + "/" + "state"
-
-  def getEntityConfigRootPath(entityType: String): String =
-    ZkUtils.ConfigPath + "/" + entityType
-
-  def getEntityConfigPath(entityType: String, entity: String): String =
-    getEntityConfigRootPath(entityType) + "/" + entity
-
-  def getEntityConfigPath(entityPath: String): String =
-    ZkUtils.ConfigPath + "/" + entityPath
-
-  def getDeleteTopicPath(topic: String): String =
-    DeleteTopicsPath + "/" + topic
-
-  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
-    val utf8Bytes = jsonData.getBytes(StandardCharsets.UTF_8)
-    val assignments = ReassignPartitionsZNode.decode(utf8Bytes) match {
-      case Left(e) => throw e
-      case Right(result) => result
-    }
-
-    assignments.map { case (tp, p) => (new TopicAndPartition(tp), p) }
-  }
-
-  def controllerZkData(brokerId: Int, timestamp: Long): String = {
-    Json.legacyEncodeAsString(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
-  }
-
-  def preferredReplicaLeaderElectionZkData(partitions: scala.collection.Set[TopicAndPartition]): String = {
-    Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition))))
-  }
-
-  def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
-    Json.legacyEncodeAsString(Map(
-      "version" -> 1,
-      "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) =>
-        Map(
-          "topic" -> topic,
-          "partition" -> partition,
-          "replicas" -> replicas
-        )
-      }
-    ))
-  }
-
-  def getReassignmentJson(partitionsToBeReassigned: Map[TopicPartition, Seq[Int]]): String = {
-    Json.encodeAsString(Map(
-      "version" -> 1,
-      "partitions" -> partitionsToBeReassigned.map { case (tp, replicas) =>
-        Map(
-          "topic" -> tp.topic,
-          "partition" -> tp.partition,
-          "replicas" -> replicas.asJava
-        ).asJava
-      }.asJava
-    ).asJava)
-  }
-}
-
-/**
- * Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead.
- */
-@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " +
-  "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0")
-class ZkUtils(val zkClient: ZkClient,
-              val zkConnection: ZkConnection,
-              val isSecure: Boolean) extends Logging {
-  import ZkUtils._
-
-  // These are persistent ZK paths that should exist on kafka broker startup.
-  val persistentZkPaths = ZkData.PersistentZkPaths
-
-  // Visible for testing
-  val zkPath = new ZkPath(zkClient)
-
-  def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path)
-
-  def getController(): Int = {
-    readDataMaybeNull(ControllerPath)._1 match {
-      case Some(controller) => parseControllerId(controller)
-      case None => throw new KafkaException("Controller doesn't exist")
-    }
-  }
-
-  def parseControllerId(controllerInfoString: String): Int = {
-    try {
-      Json.parseFull(controllerInfoString) match {
-        case Some(js) => js.asJsonObject("brokerid").to[Int]
-        case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
-      }
-    } catch {
-      case _: Throwable =>
-        // It may be due to an incompatible controller register version
-        warn("Failed to parse the controller info as json. "
-          + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
-        try controllerInfoString.toInt
-        catch {
-          case t: Throwable => throw new KafkaException(s"Failed to parse the controller info: $controllerInfoString. This is neither the new or the old format.", t)
-        }
-    }
-  }
-
-  /* Represents a cluster identifier. Stored in Zookeeper in JSON format: {"version" -> "1", "id" -> id } */
-  object ClusterId {
-
-    def toJson(id: String) = {
-      Json.legacyEncodeAsString(Map("version" -> "1", "id" -> id))
-    }
-
-    def fromJson(clusterIdJson: String): String = {
-      Json.parseFull(clusterIdJson).map(_.asJsonObject("id").to[String]).getOrElse {
-        throw new KafkaException(s"Failed to parse the cluster id json $clusterIdJson")
-      }
-    }
-  }
-
-  def getClusterId: Option[String] =
-    readDataMaybeNull(ClusterIdPath)._1.map(ClusterId.fromJson)
-
-  def createOrGetClusterId(proposedClusterId: String): String = {
-    try {
-      createPersistentPath(ClusterIdPath, ClusterId.toJson(proposedClusterId))
-      proposedClusterId
-    } catch {
-      case _: ZkNodeExistsException =>
-        getClusterId.getOrElse(throw new KafkaException("Failed to get cluster id from Zookeeper. This can only happen if /cluster/id is deleted from Zookeeper."))
-    }
-  }
-
-  def getSortedBrokerList(): Seq[Int] =
-    getChildren(BrokerIdsPath).map(_.toInt).sorted
-
-  def getAllBrokersInCluster(): Seq[Broker] = {
-    val brokerIds = getChildrenParentMayNotExist(BrokerIdsPath).sorted
-    brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get)
-  }
-
-  def getLeaderAndIsrForPartition(topic: String, partition: Int): Option[LeaderAndIsr] = {
-    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
-    val (leaderAndIsrOpt, stat) = readDataMaybeNull(leaderAndIsrPath)
-    debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
-    leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat).map(_.leaderAndIsr))
-  }
-
-  private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
-    Json.parseFull(leaderAndIsrStr).flatMap { js =>
-      val leaderIsrAndEpochInfo = js.asJsonObject
-      val leader = leaderIsrAndEpochInfo("leader").to[Int]
-      val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
-      val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
-      val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
-      val zkPathVersion = stat.getVersion
-      trace(s"Leader $leader, Epoch $epoch, Isr $isr, Zk path version $zkPathVersion for leaderAndIsrPath $path")
-      Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}
-  }
-
-  def setupCommonPaths() {
-    for(path <- persistentZkPaths)
-      makeSurePersistentPathExists(path)
-  }
-
-  def getLeaderForPartition(topic: String, partition: Int): Option[Int] = {
-    readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1.flatMap { leaderAndIsr =>
-      Json.parseFull(leaderAndIsr).map(_.asJsonObject("leader").to[Int])
-    }
-  }
-
-  /**
-   * This API should read the epoch in the ISR path. It is sufficient to read the epoch in the ISR path, since if the
-   * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
-   * other broker will retry becoming leader with the same new epoch value.
-   */
-  def getEpochForPartition(topic: String, partition: Int): Int = {
-    readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1 match {
-      case Some(leaderAndIsr) =>
-        Json.parseFull(leaderAndIsr) match {
-          case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for partition [%s,%d] is invalid".format(topic, partition))
-          case Some(js) => js.asJsonObject("leader_epoch").to[Int]
-        }
-      case None => throw new NoEpochForPartitionException("No epoch, ISR path for partition [%s,%d] is empty"
-        .format(topic, partition))
-    }
-  }
-
-  /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk.
-    * users can provide brokerId in the config , inorder to avoid conflicts between zk generated
-    * seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId.
-    */
-  def getBrokerSequenceId(MaxReservedBrokerId: Int): Int = {
-    getSequenceId(BrokerSequenceIdPath) + MaxReservedBrokerId
-  }
-
-  /**
-   * Gets the in-sync replicas (ISR) for a specific topic and partition
-   */
-  def getInSyncReplicasForPartition(topic: String, partition: Int): Seq[Int] = {
-    val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
-    leaderAndIsrOpt match {
-      case Some(leaderAndIsr) =>
-        Json.parseFull(leaderAndIsr) match {
-          case Some(js) => js.asJsonObject("isr").to[Seq[Int]]
-          case None => Seq.empty[Int]
-        }
-      case None => Seq.empty[Int]
-    }
-  }
-
-  /**
-   * Gets the assigned replicas (AR) for a specific topic and partition
-   */
-  def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = {
-    val seqOpt = for {
-      jsonPartitionMap <- readDataMaybeNull(getTopicPath(topic))._1
-      js <- Json.parseFull(jsonPartitionMap)
-      replicaMap <- js.asJsonObject.get("partitions")
-      seq <- replicaMap.asJsonObject.get(partition.toString)
-    } yield seq.to[Seq[Int]]
-    seqOpt.getOrElse(Seq.empty)
-  }
-
-  def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
-    Json.legacyEncodeAsString(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
-                    "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
-  }
-
-  /**
-   * Get JSON partition to replica map from zookeeper.
-   */
-  def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = {
-    Json.legacyEncodeAsString(Map("version" -> 1, "partitions" -> map))
-  }
-
-  /**
-   *  make sure a persistent path exists in ZK. Create the path if not exist.
-   */
-  def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = UseDefaultAcls) {
-    //Consumer path is kept open as different consumers will write under this node.
-    val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) {
-      ZooDefs.Ids.OPEN_ACL_UNSAFE
-    } else if (acls eq UseDefaultAcls) {
-      ZkUtils.defaultAcls(isSecure, path)
-    } else {
-      acls
-    }
-
-    if (!zkClient.exists(path))
-      zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
-  }
-
-  /**
-   *  create the parent path
-   */
-  private def createParentPath(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    val parentDir = path.substring(0, path.lastIndexOf('/'))
-    if (parentDir.length != 0) {
-      zkPath.createPersistent(parentDir, createParents = true, acl)
-    }
-  }
-
-  /**
-   * Create an ephemeral node with the given path and data. Create parents if necessary.
-   */
-  private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL]): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      zkPath.createEphemeral(path, data, acl)
-    } catch {
-      case _: ZkNoNodeException =>
-        createParentPath(path)
-        zkPath.createEphemeral(path, data, acl)
-    }
-  }
-
-  /**
-   * Create an ephemeral node with the given path and data.
-   * Throw NodeExistException if node already exists.
-   */
-  def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      createEphemeralPath(path, data, acl)
-    } catch {
-      case e: ZkNodeExistsException =>
-        // this can happen when there is connection loss; make sure the data is what we intend to write
-        var storedData: String = null
-        try {
-          storedData = readData(path)._1
-        } catch {
-          case _: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
-        }
-        if (storedData == null || storedData != data) {
-          info(s"conflict in $path data: $data stored data: $storedData")
-          throw e
-        } else {
-          // otherwise, the creation succeeded, return normally
-          info(s"$path exists with value $data during connection loss; this is ok")
-        }
-    }
-  }
-
-  /**
-   * Create a persistent node with the given path and data. Create parents if necessary.
-   */
-  def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      zkPath.createPersistent(path, data, acl)
-    } catch {
-      case _: ZkNoNodeException =>
-        createParentPath(path)
-        zkPath.createPersistent(path, data, acl)
-    }
-  }
-
-  def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): String = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    zkPath.createPersistentSequential(path, data, acl)
-  }
-
-  /**
-   * Update the value of a persistent node with the given path and data.
-   * create parent directory if necessary. Never throw NodeExistException.
-   * Return the updated path zkVersion
-   */
-  def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      zkClient.writeData(path, data)
-    } catch {
-      case _: ZkNoNodeException =>
-        createParentPath(path)
-        try {
-          zkPath.createPersistent(path, data, acl)
-        } catch {
-          case _: ZkNodeExistsException =>
-            zkClient.writeData(path, data)
-        }
-    }
-  }
-
-  /**
-   * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
-   * exist, the current version is not the expected version, etc.) return (false, -1)
-   *
-   * When there is a ConnectionLossException during the conditional update, zkClient will retry the update and may fail
-   * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one).
-   * In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded.
-   */
-  def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int,
-    optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
-    try {
-      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
-      debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
-        .format(path, data, expectVersion, stat.getVersion))
-      (true, stat.getVersion)
-    } catch {
-      case e1: ZkBadVersionException =>
-        optionalChecker match {
-          case Some(checker) => checker(this, path, data)
-          case _ =>
-            debug("Checker method is not passed skipping zkData match")
-            debug("Conditional update of path %s with data %s and expected version %d failed due to %s"
-              .format(path, data,expectVersion, e1.getMessage))
-            (false, -1)
-        }
-      case e2: Exception =>
-        debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
-          expectVersion, e2.getMessage))
-        (false, -1)
-    }
-  }
-
-  /**
-   * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current
-   * version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException
-   */
-  def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = {
-    try {
-      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
-      debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
-        .format(path, data, expectVersion, stat.getVersion))
-      (true, stat.getVersion)
-    } catch {
-      case nne: ZkNoNodeException => throw nne
-      case e: Exception =>
-        error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
-          expectVersion, e.getMessage))
-        (false, -1)
-    }
-  }
-
-  /**
-   * Update the value of a ephemeral node with the given path and data.
-   * create parent directory if necessary. Never throw NodeExistException.
-   */
-  def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
-    val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    try {
-      zkClient.writeData(path, data)
-    } catch {
-      case _: ZkNoNodeException =>
-        createParentPath(path)
-        zkPath.createEphemeral(path, data, acl)
-    }
-  }
-
-  def deletePath(path: String): Boolean = {
-    zkClient.delete(path)
-  }
-
-  /**
-    * Conditional delete the persistent path data, return true if it succeeds,
-    * false otherwise (the current version is not the expected version)
-    */
-   def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
-    try {
-      zkClient.delete(path, expectedVersion)
-      true
-    } catch {
-      case _: ZkBadVersionException => false
-    }
-  }
-
-  def deletePathRecursive(path: String) {
-    zkClient.deleteRecursive(path)
-  }
-
-  def subscribeDataChanges(path: String, listener: IZkDataListener): Unit =
-    zkClient.subscribeDataChanges(path, listener)
-
-  def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit =
-    zkClient.unsubscribeDataChanges(path, dataListener)
-
-  def subscribeStateChanges(listener: IZkStateListener): Unit =
-    zkClient.subscribeStateChanges(listener)
-
-  def subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]] =
-    Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala)
-
-  def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit =
-    zkClient.unsubscribeChildChanges(path, childListener)
-
-  def unsubscribeAll(): Unit =
-    zkClient.unsubscribeAll()
-
-  def readData(path: String): (String, Stat) = {
-    val stat: Stat = new Stat()
-    val dataStr: String = zkClient.readData[String](path, stat)
-    (dataStr, stat)
-  }
-
-  def readDataMaybeNull(path: String): (Option[String], Stat) = {
-    val stat = new Stat()
-    val dataAndStat = try {
-                        val dataStr = zkClient.readData[String](path, stat)
-                        (Some(dataStr), stat)
-                      } catch {
-                        case _: ZkNoNodeException =>
-                          (None, stat)
-                      }
-    dataAndStat
-  }
-
-  def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = {
-    val stat = new Stat()
-    try {
-      val data = zkClient.readData[String](path, stat)
-      (Option(data), stat.getVersion)
-    } catch {
-      case _: ZkNoNodeException => (None, stat.getVersion)
-    }
-  }
-
-  def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
-
-  def getChildrenParentMayNotExist(path: String): Seq[String] = {
-    try {
-      zkClient.getChildren(path).asScala
-    } catch {
-      case _: ZkNoNodeException => Nil
-    }
-  }
-
-  /**
-   * Check if the given path exists
-   */
-  def pathExists(path: String): Boolean = {
-    zkClient.exists(path)
-  }
-
-  def isTopicMarkedForDeletion(topic: String): Boolean = {
-    pathExists(getDeleteTopicPath(topic))
-  }
-
-  def getCluster(): Cluster = {
-    val cluster = new Cluster
-    val nodes = getChildrenParentMayNotExist(BrokerIdsPath)
-    for (node <- nodes) {
-      val brokerZKString = readData(BrokerIdsPath + "/" + node)._1
-      cluster.add(parseBrokerJson(node.toInt, brokerZKString))
-    }
-    cluster
-  }
-
-  private def parseBrokerJson(id: Int, jsonString: String): Broker = {
-    BrokerIdZNode.decode(id, jsonString.getBytes(StandardCharsets.UTF_8)).broker
-  }
-
-  def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
-    val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
-    for(topicAndPartition <- topicAndPartitions) {
-      getLeaderIsrAndEpochForPartition(topicAndPartition.topic, topicAndPartition.partition).foreach { leaderIsrAndControllerEpoch =>
-        ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
-      }
-    }
-    ret
-  }
-
-  private[utils] def getLeaderIsrAndEpochForPartition(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
-    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
-    val (leaderAndIsrOpt, stat) = readDataMaybeNull(leaderAndIsrPath)
-    debug(s"Read leaderISR $leaderAndIsrOpt for $topic-$partition")
-    leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
-  }
-
-  def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
-    val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
-    topics.foreach { topic =>
-      readDataMaybeNull(getTopicPath(topic))._1.foreach { jsonPartitionMap =>
-        Json.parseFull(jsonPartitionMap).foreach { js =>
-          js.asJsonObject.get("partitions").foreach { partitionsJs =>
-            partitionsJs.asJsonObject.iterator.foreach { case (partition, replicas) =>
-              ret.put(TopicAndPartition(topic, partition.toInt), replicas.to[Seq[Int]])
-              debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
-            }
-          }
-        }
-      }
-    }
-    ret
-  }
-
-  def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
-    val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
-    topics.foreach { topic =>
-      val partitionMapOpt = for {
-        jsonPartitionMap <- readDataMaybeNull(getTopicPath(topic))._1
-        js <- Json.parseFull(jsonPartitionMap)
-        replicaMap <- js.asJsonObject.get("partitions")
-      } yield replicaMap.asJsonObject.iterator.map { case (k, v) => (k.toInt, v.to[Seq[Int]]) }.toMap
-      val partitionMap = partitionMapOpt.getOrElse(Map.empty)
-      debug("Partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
-      ret += (topic -> partitionMap)
-    }
-    ret
-  }
-
-  def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
-    getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
-      val topic = topicAndPartitionMap._1
-      val partitionMap = topicAndPartitionMap._2
-      debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
-      topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
-    }
-  }
-
-  def getTopicPartitionCount(topic: String): Option[Int] = {
-    val topicData = getPartitionAssignmentForTopics(Seq(topic))
-    if (topicData(topic).nonEmpty)
-      Some(topicData(topic).size)
-    else
-      None
-  }
-
-  def getPartitionsBeingReassigned(): Map[TopicAndPartition, ReassignedPartitionsContext] = {
-    // read the partitions and their new replica list
-    val jsonPartitionMapOpt = readDataMaybeNull(ReassignPartitionsPath)._1
-    jsonPartitionMapOpt match {
-      case Some(jsonPartitionMap) =>
-        val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
-        reassignedPartitions.map { case (tp, newReplicas) =>
-          tp -> new ReassignedPartitionsContext(newReplicas, null)
-        }
-      case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
-    }
-  }
-
-  def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
-    val zkPath = ZkUtils.ReassignPartitionsPath
-    partitionsToBeReassigned.size match {
-      case 0 => // need to delete the /admin/reassign_partitions path
-        deletePath(zkPath)
-        info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
-      case _ =>
-        val jsonData = formatAsReassignmentJson(partitionsToBeReassigned)
-        try {
-          updatePersistentPath(zkPath, jsonData)
-          debug("Updated partition reassignment path with %s".format(jsonData))
-        } catch {
-          case _: ZkNoNodeException =>
-            createPersistentPath(zkPath, jsonData)
-            debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
-          case e2: Throwable => throw new AdminOperationException(e2.toString)
-        }
-    }
-  }
-
-  def getPartitionsUndergoingPreferredReplicaElection(): Set[TopicAndPartition] = {
-    // read the partitions and their new replica list
-    val jsonPartitionListOpt = readDataMaybeNull(PreferredReplicaLeaderElectionPath)._1
-    jsonPartitionListOpt match {
-      case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList).map(tp => new TopicAndPartition(tp))
-      case None => Set.empty[TopicAndPartition]
-    }
-  }
-
-  def deletePartition(brokerId: Int, topic: String) {
-    val brokerIdPath = BrokerIdsPath + "/" + brokerId
-    zkClient.delete(brokerIdPath)
-    val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId
-    zkClient.delete(brokerPartTopicPath)
-  }
-
-  /**
-   * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
-   * or throws an exception if the broker dies before the query to zookeeper finishes
-   *
-   * @param brokerId The broker id
-   * @return An optional Broker object encapsulating the broker metadata
-   */
-  def getBrokerInfo(brokerId: Int): Option[Broker] = {
-    readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match {
-      case Some(brokerInfo) => Some(parseBrokerJson(brokerId, brokerInfo))
-      case None => None
-    }
-  }
-
-  /**
-    * This API produces a sequence number by creating / updating given path in zookeeper
-    * It uses the stat returned by the zookeeper and return the version. Every time
-    * client updates the path stat.version gets incremented. Starting value of sequence number is 1.
-    */
-  def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = {
-    val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
-    try {
-      writeToZk
-    } catch {
-      case _: ZkNoNodeException =>
-        makeSurePersistentPathExists(path, acl)
-        writeToZk
-    }
-  }
-
-  def getAllTopics(): Seq[String] = {
-    val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
-    if(topics == null)
-      Seq.empty[String]
-    else
-      topics
-  }
-
-  /**
-   * Returns all the entities whose configs have been overridden.
-   */
-  def getAllEntitiesWithConfig(entityType: String): Seq[String] = {
-    val entities = getChildrenParentMayNotExist(getEntityConfigRootPath(entityType))
-    if(entities == null)
-      Seq.empty[String]
-    else
-      entities
-  }
-
-  def getAllPartitions(): Set[TopicAndPartition] = {
-    val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
-    if (topics == null) Set.empty[TopicAndPartition]
-    else {
-      topics.flatMap { topic =>
-        // The partitions path may not exist if the topic is in the process of being deleted
-        getChildrenParentMayNotExist(getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic, _))
-      }.toSet
-    }
-  }
-
-  def close() {
-    zkClient.close()
-  }
-}
-
-private object ZKStringSerializer extends ZkSerializer {
-
-  @throws(classOf[ZkMarshallingError])
-  def serialize(data : Object): Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
-
-  @throws(classOf[ZkMarshallingError])
-  def deserialize(bytes : Array[Byte]): Object = {
-    if (bytes == null)
-      null
-    else
-      new String(bytes, "UTF-8")
-  }
-}
-
-object ZKConfig {
-  val ZkConnectProp = "zookeeper.connect"
-  val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
-  val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
-  val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
-}
-
-class ZKConfig(props: VerifiableProperties) {
-  import ZKConfig._
-
-  /** ZK host string */
-  val zkConnect = props.getString(ZkConnectProp)
-
-  /** zookeeper session timeout */
-  val zkSessionTimeoutMs = props.getInt(ZkSessionTimeoutMsProp, 6000)
-
-  /** the max time that the client waits to establish a connection to zookeeper */
-  val zkConnectionTimeoutMs = props.getInt(ZkConnectionTimeoutMsProp, zkSessionTimeoutMs)
-
-  /** how far a ZK follower can be behind a ZK leader */
-  val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
-}
-
-class ZkPath(zkClient: ZkClient) {
-
-  @volatile private var isNamespacePresent: Boolean = false
-
-  def checkNamespace() {
-    if (isNamespacePresent)
-      return
-
-    if (!zkClient.exists("/")) {
-      throw new ConfigException("Zookeeper namespace does not exist")
-    }
-    isNamespacePresent = true
-  }
-
-  def resetNamespaceCheckedState() {
-    isNamespacePresent = false
-  }
-
-  def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) {
-    checkNamespace()
-    zkClient.createPersistent(path, data, acls)
-  }
-
-  def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) {
-    checkNamespace()
-    zkClient.createPersistent(path, createParents, acls)
-  }
-
-  def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) {
-    checkNamespace()
-    zkClient.createEphemeral(path, data, acls)
-  }
-
-  def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = {
-    checkNamespace()
-    zkClient.createPersistentSequential(path, data, acls)
-  }
-}
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index a41eda2..572938e 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -45,12 +45,10 @@ import scala.collection.mutable
 /**
  * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
  *
- * This performs better than [[kafka.utils.ZkUtils]] and should replace it completely, eventually.
- *
  * Implementation note: this class includes methods for various components (Controller, Configs, Old Consumer, etc.)
- * and returns instances of classes from the calling packages in some cases. This is not ideal, but it makes it
- * easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are
- * in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go.
+ * and returns instances of classes from the calling packages in some cases. This is not ideal, but it made it
+ * easier to migrate away from `ZkUtils` (since removed). We should revisit this. We should also consider whether a
+ * monolithic [[kafka.zk.ZkData]] is the way to go.
  */
 class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with
   Logging with KafkaMetricsGroup {
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 59c6d34..582b81d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -53,7 +53,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
   }
 
   /**
-   * Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths
+   * Checks that everyone can access ZkData.SecureZkRootPaths and ZkData.SensitiveZkRootPaths
    * when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled.
    */
   @Test
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 20c28e7..4ec8665 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -18,6 +18,7 @@
 package kafka
 
 import java.io.{File, PrintWriter}
+import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, StandardOpenOption}
 import javax.imageio.ImageIO
 
@@ -25,7 +26,7 @@ import kafka.admin.ReassignPartitionsCommand
 import kafka.admin.ReassignPartitionsCommand.Throttle
 import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
 import kafka.utils.TestUtils._
-import kafka.utils.{Exit, Logging, TestUtils, ZkUtils}
+import kafka.utils.{Exit, Logging, TestUtils}
 import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
@@ -138,7 +139,8 @@ object ReplicationQuotasTestRig {
       val newAssignment = ReassignPartitionsCommand.generateAssignment(zkClient, brokers, json(topicName), true)._1
 
       val start = System.currentTimeMillis()
-      ReassignPartitionsCommand.executeAssignment(zkClient, None, ZkUtils.getReassignmentJson(newAssignment), Throttle(config.throttle))
+      ReassignPartitionsCommand.executeAssignment(zkClient, None,
+        new String(ReassignPartitionsZNode.encode(newAssignment), StandardCharsets.UTF_8), Throttle(config.throttle))
 
       //Await completion
       waitForReassignmentToComplete()
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
deleted file mode 100755
index fa4206b..0000000
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
-import org.apache.kafka.common.metrics.Quota
-import org.easymock.EasyMock
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-import java.util.Properties
-
-import kafka.utils._
-import kafka.zk.{ConfigEntityZNode, ZooKeeperTestHarness}
-import kafka.utils.{Logging, TestUtils, ZkUtils}
-import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
-
-import scala.collection.{Map, immutable}
-import org.apache.kafka.common.security.JaasUtils
-import org.scalatest.Assertions.intercept
-
-import scala.collection.JavaConverters._
-
-@deprecated("This test has been deprecated and will be removed in a future release.", "1.1.0")
-class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
-
-  var servers: Seq[KafkaServer] = Seq()
-  var zkUtils: ZkUtils = null
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-  }
-
-  @After
-  override def tearDown() {
-    if (zkUtils != null)
-     CoreUtils.swallow(zkUtils.close(), this)
-    TestUtils.shutdownServers(servers)
-    super.tearDown()
-  }
-
-  @Test
-  def testManualReplicaAssignment() {
-    val brokers = List(0, 1, 2, 3, 4)
-    TestUtils.createBrokersInZk(zkClient, brokers)
-
-    // duplicate brokers
-    intercept[InvalidReplicaAssignmentException] {
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,0)))
-    }
-
-    // inconsistent replication factor
-    intercept[InvalidReplicaAssignmentException] {
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", Map(0->Seq(0,1), 1->Seq(0)))
-    }
-
-    // good assignment
-    val assignment = Map(0 -> List(0, 1, 2),
-                         1 -> List(1, 2, 3))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, "test", assignment)
-    val found = zkUtils.getPartitionAssignmentForTopics(Seq("test"))
-    assertEquals(assignment, found("test"))
-  }
-
-  @Test
-  def testTopicCreationInZK() {
-    val expectedReplicaAssignment = Map(
-      0  -> List(0, 1, 2),
-      1  -> List(1, 2, 3),
-      2  -> List(2, 3, 4),
-      3  -> List(3, 4, 0),
-      4  -> List(4, 0, 1),
-      5  -> List(0, 2, 3),
-      6  -> List(1, 3, 4),
-      7  -> List(2, 4, 0),
-      8  -> List(3, 0, 1),
-      9  -> List(4, 1, 2),
-      10 -> List(1, 2, 3),
-      11 -> List(1, 3, 4)
-    )
-    val leaderForPartitionMap = immutable.Map(
-      0 -> 0,
-      1 -> 1,
-      2 -> 2,
-      3 -> 3,
-      4 -> 4,
-      5 -> 0,
-      6 -> 1,
-      7 -> 2,
-      8 -> 3,
-      9 -> 4,
-      10 -> 1,
-      11 -> 1
-    )
-    val topic = "test"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
-    // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
-    // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => p -> zkUtils.getReplicasForPartition(topic, p)).toMap
-    assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
-    for(i <- 0 until actualReplicaList.size)
-      assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
-
-    intercept[TopicExistsException] {
-      // shouldn't be able to create a topic that already exists
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
-    }
-  }
-
-  @Test
-  def testTopicCreationWithCollision() {
-    val topic = "test.topic"
-    val collidingTopic = "test_topic"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
-    // create the topic
-    AdminUtils.createTopic(zkUtils, topic, 3, 1)
-
-    intercept[InvalidTopicException] {
-      // shouldn't be able to create a topic that collides
-      AdminUtils.createTopic(zkUtils, collidingTopic, 3, 1)
-    }
-  }
-
-  @Test
-  def testConcurrentTopicCreation() {
-    val topic = "test.topic"
-
-    // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
-    val zkMock: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkMock.pathExists(s"/brokers/topics/$topic")).andReturn(false)
-    EasyMock.expect(zkMock.getAllTopics).andReturn(Seq("some.topic", topic, "some.other.topic"))
-    EasyMock.replay(zkMock)
-
-    intercept[TopicExistsException] {
-      AdminUtils.validateCreateOrUpdateTopic(zkMock, topic, Map.empty, new Properties, update = false)
-    }
-  }
-
-  /**
-   * This test simulates a client config change in ZK whose notification has been purged.
-   * Basically, it asserts that notifications are bootstrapped from ZK
-   */
-  @Test
-  def testBootstrapClientIdConfig() {
-    val clientId = "my-client"
-    val props = new Properties()
-    props.setProperty("producer_byte_rate", "1000")
-    props.setProperty("consumer_byte_rate", "2000")
-
-    // Write config without notification to ZK.
-    val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000")
-    val map = Map("version" -> 1, "config" -> configMap.asJava)
-    zkUtils.updatePersistentPath(ConfigEntityZNode.path(ConfigType.Client, clientId), Json.encodeAsString(map.asJava))
-
-    val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client)
-    assertEquals("Must have 1 overridden client config", 1, configInZk.size)
-    assertEquals(props, configInZk(clientId))
-
-    // Test that the existing clientId overrides are read
-    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
-    servers = Seq(server)
-    assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId))
-    assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId))
-  }
-
-  @Test
-  def testGetBrokerMetadatas() {
-    // broker 4 has no rack information
-    val brokerList = 0 to 5
-    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 5 -> "rack3")
-    val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet))
-    TestUtils.createBrokersInZk(brokerMetadatas, zkClient)
-
-    val processedMetadatas1 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Disabled)
-    assertEquals(brokerList, processedMetadatas1.map(_.id))
-    assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack))
-
-    val processedMetadatas2 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Safe)
-    assertEquals(brokerList, processedMetadatas2.map(_.id))
-    assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack))
-
-    intercept[AdminOperationException] {
-      AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
-    }
-
-    val partialList = List(0, 1, 2, 3, 5)
-    val processedMetadatas3 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced, Some(partialList))
-    assertEquals(partialList, processedMetadatas3.map(_.id))
-    assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack))
-
-    val numPartitions = 3
-    AdminUtils.createTopic(zkUtils, "foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe)
-    val assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo"))
-    assertEquals(numPartitions, assignment.size)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala b/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
deleted file mode 100644
index 7cbad05..0000000
--- a/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.admin
-
-import java.util.Properties
-import kafka.utils.ZkUtils
-
-@deprecated("This class is deprecated since AdminUtilities will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
-class TestAdminUtils extends AdminUtilities {
-  override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties): Unit = {}
-  override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {new Properties}
-  override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties): Unit = {}
-  override def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties): Unit = {}
-  override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {}
-}
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 0462300..2572969 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -62,7 +62,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
      * There is no easy way to test purging. Even if we mock kafka time with MockTime, the purging compares kafka time
      * with the time stored in ZooKeeper stat and the embedded ZooKeeper server does not provide a way to mock time.
      * So to test purging we would have to use Time.SYSTEM.sleep(changeExpirationMs + 1) issue a write and check
-     * Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size). However even that the assertion
+     * Assert.assertEquals(1, KafkaZkClient.getChildren(seqNodeRoot).size). However even that the assertion
      * can fail as the second node can be deleted depending on how threads get scheduled.
      */
 
@@ -106,4 +106,4 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
 
     def setThrowSize(index: Int): Unit = throwSize = Option(index)
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 9b159fb..9482dfa 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -103,7 +103,7 @@ class PartitionStateMachineTest {
   }
 
   @Test
-  def testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates(): Unit = {
+  def testNewPartitionToOnlinePartitionTransitionZooKeeperClientExceptionFromCreateStates(): Unit = {
     controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     controllerContext.putPartitionState(partition, NewPartition)
@@ -345,7 +345,7 @@ class PartitionStateMachineTest {
   }
 
   @Test
-  def testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup(): Unit = {
+  def testOfflinePartitionToOnlinePartitionTransitionZooKeeperClientExceptionFromStateLookup(): Unit = {
     controllerContext.setLiveBrokerAndEpochs(Map(TestUtils.createBrokerAndEpoch(brokerId, "host", 0)))
     controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     controllerContext.putPartitionState(partition, OfflinePartition)
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
deleted file mode 100755
index c0c3c6b..0000000
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.security.JaasUtils
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-
-@deprecated("Deprecated given that ZkUtils is deprecated", since = "2.0.0")
-class ZkUtilsTest extends ZooKeeperTestHarness {
-
-  val path = "/path"
-  var zkUtils: ZkUtils = _
-
-  @Before
-  override def setUp() {
-    super.setUp
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-  }
-
-  @After
-  override def tearDown() {
-    if (zkUtils != null)
-     CoreUtils.swallow(zkUtils.close(), this)
-    super.tearDown
-  }
-
-  @Test
-  def testSuccessfulConditionalDeletePath() {
-    // Given an existing path
-    zkUtils.createPersistentPath(path)
-    val (_, statAfterCreation) = zkUtils.readData(path)
-
-    // Deletion is successful when the version number matches
-    assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion))
-    val (optionalData, _) = zkUtils.readDataMaybeNull(path)
-    assertTrue("Node should be deleted", optionalData.isEmpty)
-
-    // Deletion is successful when the node does not exist too
-    assertTrue("Deletion should be successful", zkUtils.conditionalDeletePath(path, 0))
-  }
-
-  // Verify behaviour of ZkUtils.createSequentialPersistentPath since PIDManager relies on it
-  @Test
-  def testPersistentSequentialPath() {
-    // Given an existing path
-    zkUtils.createPersistentPath(path)
-
-    var result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
-
-    assertEquals("/path/sequence_0000000000", result)
-
-    result = zkUtils.createSequentialPersistentPath(path + "/sequence_")
-
-    assertEquals("/path/sequence_0000000001", result)
-  }
-
-  @Test
-  def testAbortedConditionalDeletePath() {
-    // Given an existing path that gets updated
-    zkUtils.createPersistentPath(path)
-    val (_, statAfterCreation) = zkUtils.readData(path)
-    zkUtils.updatePersistentPath(path, "data")
-
-    // Deletion is aborted when the version number does not match
-    assertFalse("Deletion should be aborted", zkUtils.conditionalDeletePath(path, statAfterCreation.getVersion))
-    val (optionalData, _) = zkUtils.readDataMaybeNull(path)
-    assertTrue("Node should still be there", optionalData.isDefined)
-  }
-
-  @Test
-  def testClusterIdentifierJsonParsing() {
-    val clusterId = "test"
-    assertEquals(zkUtils.ClusterId.fromJson(zkUtils.ClusterId.toJson(clusterId)), clusterId)
-  }
-
-  @Test
-  def testGetAllPartitionsTopicWithoutPartitions() {
-    val topic = "testtopic"
-    // Create a regular topic and a topic without any partitions
-    zkUtils.createPersistentPath(ZkUtils.getTopicPartitionPath(topic, 0))
-    zkUtils.createPersistentPath(ZkUtils.getTopicPath("nopartitions"))
-
-    assertEquals(Set(TopicAndPartition(topic, 0)), zkUtils.getAllPartitions())
-  }
-
-  @Test
-  def testGetLeaderIsrAndEpochForPartition() {
-    val topic = "my-topic-test"
-    val partition = 0
-    val leader = 1
-    val leaderEpoch = 1
-    val controllerEpoch = 1
-    val isr = List(1, 2)
-    val topicPath = s"/brokers/topics/$topic/partitions/$partition/state"
-    val topicData = Json.legacyEncodeAsString(Map("controller_epoch" -> controllerEpoch, "leader" -> leader,
-      "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr))
-    zkUtils.createPersistentPath(topicPath, topicData)
-
-    val leaderIsrAndControllerEpoch = zkUtils.getLeaderIsrAndEpochForPartition(topic, partition)
-    val topicDataLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, 0),
-      controllerEpoch)
-    assertEquals(topicDataLeaderIsrAndControllerEpoch, leaderIsrAndControllerEpoch.get)
-    assertEquals(None, zkUtils.getLeaderIsrAndEpochForPartition(topic, partition + 1))
-  }
-
-  @Test
-  def testGetSequenceIdMethod() {
-    val path = "/test/seqid"
-    (1 to 10).foreach { seqid =>
-      assertEquals(seqid, zkUtils.getSequenceId(path))
-    }
-  }
-}