You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2018/11/30 05:41:49 UTC

[kafka] branch trunk updated: KAFKA-7259; Remove deprecated ZKUtils usage from ZkSecurityMigrator

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 944f24c  KAFKA-7259; Remove deprecated ZKUtils usage from ZkSecurityMigrator
944f24c is described below

commit 944f24cfdc8edd2b4ef89ed40a55a480e8f89632
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri Nov 30 11:11:25 2018 +0530

    KAFKA-7259; Remove deprecated ZKUtils usage from ZkSecurityMigrator
    
    - Remove ZKUtils usage from various tests
    
    Author: Manikumar Reddy <ma...@gmail.com>
    
    Reviewers: Sriharsha Chintalapani <sr...@apache.org>, Ismael Juma <is...@juma.me.uk>, Satish Duggana <sa...@apache.org>, Jun Rao <ju...@gmail.com>, Ryanne Dolan <ry...@gmail.com>
    
    Closes #5480 from omkreddy/zkutils
---
 .../scala/kafka/admin/ZkSecurityMigrator.scala     |  36 ++--
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   1 -
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  85 ++++++---
 .../scala/kafka/zk/ZkSecurityMigratorUtils.scala   |  30 ++++
 .../kafka/api/SaslPlainPlaintextConsumerTest.scala |  11 +-
 .../SaslPlainSslEndToEndAuthorizationTest.scala    |   7 +-
 .../other/kafka/ReplicationQuotasTestRig.scala     |   4 +-
 .../scala/unit/kafka/admin/TopicCommandTest.scala  |   7 +-
 .../kafka/security/auth/ZkAuthorizationTest.scala  | 199 ++++++++++++---------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  32 ++--
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  29 +++
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |   2 +-
 12 files changed, 279 insertions(+), 164 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index a833db4..5cab801 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -17,9 +17,11 @@
 
 package kafka.admin
 
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging, ZkUtils}
+import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging}
+import kafka.zk.{KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
 import org.I0Itec.zkclient.exception.ZkException
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
@@ -92,8 +94,9 @@ object ZkSecurityMigrator extends Logging {
     val zkUrl = opts.options.valueOf(opts.zkUrlOpt)
     val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
     val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
-    val zkUtils = ZkUtils(zkUrl, zkSessionTimeout, zkConnectionTimeout, zkAcl)
-    val migrator = new ZkSecurityMigrator(zkUtils)
+    val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout,
+      Int.MaxValue, Time.SYSTEM)
+    val migrator = new ZkSecurityMigrator(zkClient)
     migrator.run()
   }
 
@@ -120,20 +123,21 @@ object ZkSecurityMigrator extends Logging {
   }
 }
 
-class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
+class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
+  private val zkSecurityMigratorUtils = new ZkSecurityMigratorUtils(zkClient)
   private val futures = new Queue[Future[String]]
 
-  private def setAcl(path: String, setPromise: Promise[String]) = {
+  private def setAcl(path: String, setPromise: Promise[String]): Unit = {
     info("Setting ACL for path %s".format(path))
-    zkUtils.zkConnection.getZookeeper.setACL(path, zkUtils.defaultAcls(path), -1, SetACLCallback, setPromise)
+    zkSecurityMigratorUtils.currentZooKeeper.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, setPromise)
   }
 
-  private def getChildren(path: String, childrenPromise: Promise[String]) = {
+  private def getChildren(path: String, childrenPromise: Promise[String]): Unit = {
     info("Getting children to set ACLs for path %s".format(path))
-    zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise)
+    zkSecurityMigratorUtils.currentZooKeeper.getChildren(path, false, GetChildrenCallback, childrenPromise)
   }
 
-  private def setAclIndividually(path: String) = {
+  private def setAclIndividually(path: String): Unit = {
     val setPromise = Promise[String]
     futures.synchronized {
       futures += setPromise.future
@@ -141,7 +145,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
     setAcl(path, setPromise)
   }
 
-  private def setAclsRecursively(path: String) = {
+  private def setAclsRecursively(path: String): Unit = {
     val setPromise = Promise[String]
     val childrenPromise = Promise[String]
     futures.synchronized {
@@ -157,7 +161,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
                       path: String,
                       ctx: Object,
                       children: java.util.List[String]) {
-      val zkHandle = zkUtils.zkConnection.getZookeeper
+      val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
       val promise = ctx.asInstanceOf[Promise[String]]
       Code.get(rc) match {
         case Code.OK =>
@@ -191,7 +195,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
                       path: String,
                       ctx: Object,
                       stat: Stat) {
-      val zkHandle = zkUtils.zkConnection.getZookeeper
+      val zkHandle = zkSecurityMigratorUtils.currentZooKeeper
       val promise = ctx.asInstanceOf[Promise[String]]
 
       Code.get(rc) match {
@@ -199,7 +203,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
           info("Successfully set ACLs for %s".format(path))
           promise success "done"
         case Code.CONNECTIONLOSS =>
-            zkHandle.setACL(path, zkUtils.defaultAcls(path), -1, SetACLCallback, ctx)
+            zkHandle.setACL(path, zkClient.defaultAcls(path).asJava, -1, SetACLCallback, ctx)
         case Code.NONODE =>
           warn("Znode is gone, it could be have been legitimately deleted: %s".format(path))
           promise success "done"
@@ -218,9 +222,9 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
   private def run(): Unit = {
     try {
       setAclIndividually("/")
-      for (path <- ZkUtils.SecureZkRootPaths) {
+      for (path <- ZkData.SecureRootPaths) {
         debug("Going to set ACL for %s".format(path))
-        zkUtils.makeSurePersistentPathExists(path)
+        zkClient.makeSurePersistentPathExists(path)
         setAclsRecursively(path)
       }
 
@@ -240,7 +244,7 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
       recurse()
 
     } finally {
-      zkUtils.close
+      zkClient.close
     }
   }
 }
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 4c8c4e1..281e920 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -20,7 +20,6 @@ package kafka.tools
 import java.io._
 import java.nio.ByteBuffer
 
-import joptsimple.OptionParser
 import kafka.coordinator.group.{GroupMetadataKey, GroupMetadataManager, OffsetKey}
 import kafka.coordinator.transaction.TransactionLog
 import kafka.log._
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 4ad40ef..732a827 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -81,8 +81,8 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * @param data the znode data
    * @return the created path (including the appended monotonically increasing number)
    */
-  private[zk] def createSequentialPersistentPath(path: String, data: Array[Byte]): String = {
-    val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+  private[kafka] def createSequentialPersistentPath(path: String, data: Array[Byte]): String = {
+    val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow
     createResponse.name
@@ -137,7 +137,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
       try {
         val transaction = zooKeeperClient.createTransaction()
         transaction.create(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp),
-          acls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL)
+          defaultAcls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL)
         transaction.setData(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion)
         val results = transaction.commit()
         val setDataResult = results.get(1).asInstanceOf[SetDataResult]
@@ -214,7 +214,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
       val path = TopicPartitionStateZNode.path(partition)
       val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
-      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
+      CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(createRequests.toSeq)
   }
@@ -237,7 +237,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    */
   def createControllerEpochRaw(epoch: Int): CreateResponse = {
     val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch),
-      acls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
+      defaultAcls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
     retryRequestUntilConnected(createRequest)
   }
 
@@ -384,7 +384,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   def createConfigChangeNotification(sanitizedEntityPath: String): Unit = {
     makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
     val path = ConfigEntityChangeNotificationSequenceZNode.createPath
-    val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow()
   }
@@ -803,7 +803,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
 
     def create(reassignmentData: Array[Byte]): CreateResponse = {
-      val createRequest = CreateRequest(ReassignPartitionsZNode.path, reassignmentData, acls(ReassignPartitionsZNode.path),
+      val createRequest = CreateRequest(ReassignPartitionsZNode.path, reassignmentData, defaultAcls(ReassignPartitionsZNode.path),
         CreateMode.PERSISTENT, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
       retryRequestUntilConnected(createRequest)
     }
@@ -832,9 +832,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
    */
   def deletePartitionReassignment(expectedControllerEpochZkVersion: Int): Unit = {
-    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, ZkVersion.MatchAnyVersion,
-      zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
-    retryRequestUntilConnected(deleteRequest)
+    deletePath(ReassignPartitionsZNode.path, expectedControllerEpochZkVersion)
   }
 
   /**
@@ -1114,7 +1112,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   def createAclsForResourceIfNotExists(resource: Resource, aclsSet: Set[Acl]): (Boolean, Int) = {
     def create(aclData: Array[Byte]): CreateResponse = {
       val path = ResourceZNode.path(resource)
-      val createRequest = CreateRequest(path, aclData, acls(path), CreateMode.PERSISTENT)
+      val createRequest = CreateRequest(path, aclData, defaultAcls(path), CreateMode.PERSISTENT)
       retryRequestUntilConnected(createRequest)
     }
 
@@ -1134,7 +1132,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    */
   def createAclChangeNotification(resource: Resource): Unit = {
     val aclChange = ZkAclStore(resource.patternType).changeStore.createChangeNode(resource)
-    val createRequest = CreateRequest(aclChange.path, aclChange.bytes, acls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createRequest = CreateRequest(aclChange.path, aclChange.bytes, defaultAcls(aclChange.path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow
   }
@@ -1241,11 +1239,21 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
 
   /**
    * Deletes the zk node recursively
-   * @param path
-   * @return  return true if it succeeds, false otherwise
+   * @param path path to delete
+   * @param expectedControllerEpochZkVersion expected controller epoch zkVersion.
+   * @param recursiveDelete enable recursive delete
+   * @return KeeperException if there is an error while deleting the path
    */
-  def deletePath(path: String): Boolean = {
-    deleteRecursive(path)
+  def deletePath(path: String, expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion, recursiveDelete: Boolean = true): Unit = {
+    if (recursiveDelete)
+      deleteRecursive(path, expectedControllerEpochZkVersion)
+    else {
+      val deleteRequest = DeleteRequest(path, ZkVersion.MatchAnyVersion, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))
+      val deleteResponse = retryRequestUntilConnected(deleteRequest)
+      if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
+          throw deleteResponse.resultException.get
+      }
+    }
   }
 
   /**
@@ -1262,7 +1270,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    */
   def createTokenChangeNotification(tokenId: String): Unit = {
     val path = DelegationTokenChangeNotificationSequenceZNode.createPath
-    val createRequest = CreateRequest(path, DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createRequest = CreateRequest(path, DelegationTokenChangeNotificationSequenceZNode.encode(tokenId), defaultAcls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.resultException.foreach(e => throw e)
   }
@@ -1283,7 +1291,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
 
     def create(tokenData: Array[Byte]): CreateResponse = {
       val path = DelegationTokenInfoZNode.path(token.tokenInfo().tokenId())
-      val createRequest = CreateRequest(path, tokenData, acls(path), CreateMode.PERSISTENT)
+      val createRequest = CreateRequest(path, tokenData, defaultAcls(path), CreateMode.PERSISTENT)
       retryRequestUntilConnected(createRequest)
     }
 
@@ -1440,6 +1448,31 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   /**
+    * Return the ACLs of the node of the given path
+    * @param path the given path for the node
+    * @return the ACL array of the given node.
+    */
+  def getAcl(path: String): Seq[ACL] = {
+    val getAclRequest = GetAclRequest(path)
+    val getAclResponse = retryRequestUntilConnected(getAclRequest)
+    getAclResponse.resultCode match {
+      case Code.OK => getAclResponse.acl
+      case _ => throw getAclResponse.resultException.get
+    }
+  }
+
+  /**
+    * sets the ACLs to the node of the given path
+    * @param path the given path for the node
+    * @param acl the given acl for the node
+    */
+  def setAcl(path: String, acl: Seq[ACL]): Unit = {
+    val setAclRequest = SetAclRequest(path, acl, ZkVersion.MatchAnyVersion)
+    val setAclResponse = retryRequestUntilConnected(setAclRequest)
+    setAclResponse.maybeThrow
+  }
+
+  /**
     * Create the cluster Id. If the cluster id already exists, return the current cluster id.
     * @return  cluster id
     */
@@ -1529,7 +1562,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
   }
 
-  private[zk] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
+  private[kafka] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
 
     def parentPath(path: String): String = {
       val indexOfLastSlash = path.lastIndexOf("/")
@@ -1538,7 +1571,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     }
 
     def createRecursive0(path: String): Unit = {
-      val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
+      val createRequest = CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT)
       var createResponse = retryRequestUntilConnected(createRequest)
       if (createResponse.resultCode == Code.NONODE) {
         createRecursive0(parentPath(path))
@@ -1551,7 +1584,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
       }
     }
 
-    val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT)
+    val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.PERSISTENT)
     var createResponse = retryRequestUntilConnected(createRequest)
 
     if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
@@ -1569,7 +1602,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   private def createTopicPartition(partitions: Seq[TopicPartition], expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = {
     val createRequests = partitions.map { partition =>
       val path = TopicPartitionZNode.path(partition)
-      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
+      CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(createRequests)
   }
@@ -1577,7 +1610,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   private def createTopicPartitions(topics: Seq[String], expectedControllerEpochZkVersion: Int):Seq[CreateResponse] = {
     val createRequests = topics.map { topic =>
       val path = TopicPartitionsZNode.path(topic)
-      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion))
+      CreateRequest(path, null, defaultAcls(path), CreateMode.PERSISTENT, Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion))
     }
     retryRequestsUntilConnected(createRequests)
   }
@@ -1589,7 +1622,9 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
     retryRequestsUntilConnected(getDataRequests)
   }
 
-  private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
+  def defaultAcls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
+
+  def secure: Boolean = isSecure
 
   private[zk] def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
     retryRequestsUntilConnected(Seq(request)).head
@@ -1655,7 +1690,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
     def create(): Code = {
-      val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
+      val createRequest = CreateRequest(path, data, defaultAcls(path), CreateMode.EPHEMERAL)
       val createResponse = retryRequestUntilConnected(createRequest)
       val createResultCode = createResponse.resultCode match {
         case code@ Code.OK =>
diff --git a/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala
new file mode 100644
index 0000000..31a7ba2
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/ZkSecurityMigratorUtils.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import org.apache.zookeeper.ZooKeeper
+
+/**
+ * This class should only be used in ZkSecurityMigrator tool.
+ * This class will be removed after we migrate ZkSecurityMigrator away from ZK's asynchronous API.
+ * @param kafkaZkClient
+ */
+class ZkSecurityMigratorUtils(val kafkaZkClient: KafkaZkClient) {
+
+  def currentZooKeeper: ZooKeeper = kafkaZkClient.currentZooKeeper
+
+}
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 5789d1a..c15a508 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -16,9 +16,8 @@ import java.io.File
 import java.util.Locale
 
 import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils}
+import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before, Test}
 
@@ -29,6 +28,8 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
   private val kafkaServerJaasEntryName =
     s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+  // disable secure acls of zkClient in ZooKeeperTestHarness
+  override protected def zkAclsEnabled = Some(false)
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
   override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
@@ -36,7 +37,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
 
   @Before
   override def setUp(): Unit = {
-    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName))
     super.setUp()
   }
 
@@ -52,8 +53,6 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
    */
   @Test
   def testZkAclsDisabled() {
-    val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-    TestUtils.verifyUnsecureZkAcls(zkUtils)
-    CoreUtils.swallow(zkUtils.close(), this)
+    TestUtils.verifyUnsecureZkAcls(zkClient)
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index efb8c48..bbe0dd8 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -22,12 +22,11 @@ import javax.security.auth.Subject
 import javax.security.auth.login.AppConfigurationEntry
 
 import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.utils.{TestUtils}
 import kafka.utils.JaasTestUtils._
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth._
 import org.apache.kafka.common.security.plain.PlainAuthenticateCallback
 import org.junit.Test
@@ -134,8 +133,6 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
    */
   @Test
   def testAcls() {
-    val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-    TestUtils.verifySecureZkAcls(zkUtils, 1)
-    CoreUtils.swallow(zkUtils.close(), this)
+    TestUtils.verifySecureZkAcls(zkClient, 1)
   }
 }
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 9c6ae0b..20c28e7 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -19,16 +19,16 @@ package kafka
 
 import java.io.{File, PrintWriter}
 import java.nio.file.{Files, StandardOpenOption}
-
 import javax.imageio.ImageIO
+
 import kafka.admin.ReassignPartitionsCommand
 import kafka.admin.ReassignPartitionsCommand.Throttle
-import org.apache.kafka.common.TopicPartition
 import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
 import kafka.utils.TestUtils._
 import kafka.utils.{Exit, Logging, TestUtils, ZkUtils}
 import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness}
 import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
 import org.jfree.chart.plot.PlotOrientation
 import org.jfree.chart.{ChartFactory, ChartFrame, JFreeChart}
 import org.jfree.data.xy.{XYSeries, XYSeriesCollection}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 5d2d873..5cfab90 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -20,10 +20,9 @@ import org.junit.Assert._
 import org.junit.Test
 import kafka.utils.Logging
 import kafka.utils.TestUtils
-import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
+import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode, ZooKeeperTestHarness}
 import kafka.server.ConfigType
 import kafka.admin.TopicCommand.TopicCommandOptions
-import kafka.utils.ZkUtils.getDeleteTopicPath
 import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.config.ConfigException
@@ -80,7 +79,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
 
     // delete the NormalTopic
     val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
-    val deletePath = getDeleteTopicPath(normalTopic)
+    val deletePath = DeleteTopicsTopicZNode.path(normalTopic)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deletePath))
     TopicCommand.deleteTopic(zkClient, deleteOpts)
     assertTrue("Delete path for topic should exist after deletion.", zkClient.pathExists(deletePath))
@@ -93,7 +92,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
 
     // try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
     val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
-    val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
+    val deleteOffsetTopicPath = DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
       TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 1cdbe4b..de5ae22 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -17,23 +17,32 @@
 
 package kafka.security.auth
 
+import java.nio.charset.StandardCharsets
+
 import kafka.admin.ZkSecurityMigrator
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
-import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness}
-import org.apache.kafka.common.KafkaException
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk._
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
-import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 import javax.security.auth.login.Configuration
 
+import kafka.api.ApiVersion
+import kafka.cluster.{Broker, EndPoint}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
 class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
   val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
   val authProvider = "zookeeper.authProvider.1"
-  var zkUtils: ZkUtils = null
 
   @Before
   override def setUp() {
@@ -41,13 +50,10 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
     Configuration.setConfiguration(null)
     System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
     super.setUp()
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
   }
 
   @After
   override def tearDown() {
-    if (zkUtils != null)
-     CoreUtils.swallow(zkUtils.close(), this)
     super.tearDown()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     System.clearProperty(authProvider)
@@ -59,7 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    * secure ACLs and authentication with ZooKeeper.
    */
   @Test
-  def testIsZkSecurityEnabled() {
+  def testIsZkSecurityEnabled(): Unit = {
     assertTrue(JaasUtils.isZkSecurityEnabled())
     Configuration.setConfiguration(null)
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
@@ -75,59 +81,76 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
   }
 
   /**
-   * Exercises the code in ZkUtils. The goal is mainly
-   * to verify that the behavior of ZkUtils is correct
+   * Exercises the code in KafkaZkClient. The goal is mainly
+   * to verify that the behavior of KafkaZkClient is correct
    * when isSecure is set to true.
    */
   @Test
-  def testZkUtils() {
-    assertTrue(zkUtils.isSecure)
-    for (path <- zkUtils.persistentZkPaths) {
-      zkUtils.makeSurePersistentPathExists(path)
-      if (ZkUtils.sensitivePath(path)) {
-        val aclList = zkUtils.zkConnection.getAcl(path).getKey
+  def testKafkaZkClient(): Unit = {
+    assertTrue(zkClient.secure)
+    for (path <- ZkData.PersistentZkPaths) {
+      zkClient.makeSurePersistentPathExists(path)
+      if (ZkData.sensitivePath(path)) {
+        val aclList = zkClient.getAcl(path)
         assertEquals(s"Unexpected acl list size for $path", 1, aclList.size)
-        for (acl <- aclList.asScala)
+        for (acl <- aclList)
           assertTrue(TestUtils.isAclSecure(acl, sensitive = true))
-      } else if (!path.equals(ZkUtils.ConsumersPath)) {
-        val aclList = zkUtils.zkConnection.getAcl(path).getKey
+      } else if (!path.equals(ConsumerPathZNode.path)) {
+        val aclList = zkClient.getAcl(path)
         assertEquals(s"Unexpected acl list size for $path", 2, aclList.size)
-        for (acl <- aclList.asScala)
+        for (acl <- aclList)
           assertTrue(TestUtils.isAclSecure(acl, sensitive = false))
       }
     }
-    // Test that can create: createEphemeralPathExpectConflict
-    zkUtils.createEphemeralPathExpectConflict("/a", "")
-    verify("/a")
-    // Test that can create: createPersistentPath
-    zkUtils.createPersistentPath("/b")
-    verify("/b")
+
+    // Test that creates Ephemeral node
+    val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+    zkClient.registerBroker(brokerInfo)
+    verify(brokerInfo.path)
+
+    // Test that creates persistent nodes
+    val topic1 = "topic1"
+    val assignment = Map(
+      new TopicPartition(topic1, 0) -> Seq(0, 1),
+      new TopicPartition(topic1, 1) -> Seq(0, 1),
+      new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
+    )
+
+    // create a topic assignment
+    zkClient.createTopicAssignment(topic1, assignment)
+    verify(TopicZNode.path(topic1))
+
     // Test that can create: createSequentialPersistentPath
-    val seqPath = zkUtils.createSequentialPersistentPath("/c", "")
+    val seqPath = zkClient.createSequentialPersistentPath("/c", "".getBytes(StandardCharsets.UTF_8))
     verify(seqPath)
-    // Test that can update: updateEphemeralPath
-    zkUtils.updateEphemeralPath("/a", "updated")
-    val valueA: String = zkUtils.zkClient.readData("/a")
-    assertTrue(valueA.equals("updated"))
-    // Test that can update: updatePersistentPath
-    zkUtils.updatePersistentPath("/b", "updated")
-    val valueB: String = zkUtils.zkClient.readData("/b")
-    assertTrue(valueB.equals("updated"))
 
-    info("Leaving testZkUtils")
+    // Test that can update Ephemeral node
+    val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+    zkClient.updateBrokerInfo(updatedBrokerInfo)
+    assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+    // Test that can update persistent nodes
+    val updatedAssignment = assignment - new TopicPartition(topic1, 2)
+    zkClient.setTopicAssignment(topic1, updatedAssignment)
+    assertEquals(updatedAssignment.size, zkClient.getTopicPartitionCount(topic1).get)
   }
 
+  private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
+                               rack: Option[String] = None): BrokerInfo =
+    BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
+    (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
+
   /**
    * Tests the migration tool when making an unsecure
    * cluster secure.
    */
   @Test
-  def testZkMigration() {
-    val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false) 
+  def testZkMigration(): Unit = {
+    val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
     try {
-      testMigration(zkConnect, unsecureZkUtils, zkUtils)
+      testMigration(zkConnect, unsecureZkClient, zkClient)
     } finally {
-      unsecureZkUtils.close()
+      unsecureZkClient.close()
     }
   }
 
@@ -136,12 +159,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    * cluster unsecure.
    */
   @Test
-  def testZkAntiMigration() {
-    val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+  def testZkAntiMigration(): Unit = {
+    val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
     try {
-      testMigration(zkConnect, zkUtils, unsecureZkUtils)
+      testMigration(zkConnect, zkClient, unsecureZkClient)
     } finally {
-      unsecureZkUtils.close()
+      unsecureZkClient.close()
     }
   }
 
@@ -149,42 +172,42 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    * Tests that the persistent paths cannot be deleted.
    */
   @Test
-  def testDelete() {
+  def testDelete(): Unit = {
     info(s"zkConnect string: $zkConnect")
     ZkSecurityMigrator.run(Array("--zookeeper.acl=secure", s"--zookeeper.connect=$zkConnect"))
     deleteAllUnsecure()
   }
 
   /**
-   * Tests that znodes cannot be deleted when the 
+   * Tests that znodes cannot be deleted when the
    * persistent paths have children.
    */
   @Test
-  def testDeleteRecursive() {
+  def testDeleteRecursive(): Unit = {
     info(s"zkConnect string: $zkConnect")
-    for (path <- ZkUtils.SecureZkRootPaths) {
+    for (path <- ZkData.SecureRootPaths) {
       info(s"Creating $path")
-      zkUtils.makeSurePersistentPathExists(path)
-      zkUtils.createPersistentPath(s"$path/fpjwashere", "")
+      zkClient.makeSurePersistentPathExists(path)
+      zkClient.createRecursive(s"$path/fpjwashere", "".getBytes(StandardCharsets.UTF_8))
     }
-    zkUtils.zkConnection.setAcl("/", zkUtils.defaultAcls("/"), -1)
+    zkClient.setAcl("/", zkClient.defaultAcls("/"))
     deleteAllUnsecure()
   }
-  
+
   /**
    * Tests the migration tool when chroot is being used.
    */
   @Test
   def testChroot(): Unit = {
     val zkUrl = zkConnect + "/kafka"
-    zkUtils.createPersistentPath("/kafka")
-    val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false)
-    val secureZkUtils = ZkUtils(zkUrl, 6000, 6000, true)
+    zkClient.createRecursive("/kafka")
+    val unsecureZkClient = KafkaZkClient(zkUrl, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+    val secureZkClient = KafkaZkClient(zkUrl, true, 6000, 6000, Int.MaxValue, Time.SYSTEM)
     try {
-      testMigration(zkUrl, unsecureZkUtils, secureZkUtils)
+      testMigration(zkUrl, unsecureZkClient, secureZkClient)
     } finally {
-      unsecureZkUtils.close()
-      secureZkUtils.close()
+      unsecureZkClient.close()
+      secureZkClient.close()
     }
   }
 
@@ -192,62 +215,62 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    * Exercises the migration tool. It is used in these test cases:
    * testZkMigration, testZkAntiMigration, testChroot.
    */
-  private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: ZkUtils) {
+  private def testMigration(zkUrl: String, firstZk: KafkaZkClient, secondZk: KafkaZkClient): Unit = {
     info(s"zkConnect string: $zkUrl")
-    for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) {
+    for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
       info(s"Creating $path")
       firstZk.makeSurePersistentPathExists(path)
       // Create a child for each znode to exercise the recurrent
       // traversal of the data tree
-      firstZk.createPersistentPath(s"$path/fpjwashere", "")
+      firstZk.createRecursive(s"$path/fpjwashere", "".getBytes(StandardCharsets.UTF_8))
     }
     // Getting security option to determine how to verify ACLs.
     // Additionally, we create the consumers znode (not in
     // securePersistentZkPaths) to make sure that we don't
     // add ACLs to it.
     val secureOpt: String =
-      if (secondZk.isSecure) {
-        firstZk.createPersistentPath(ZkUtils.ConsumersPath)
+      if (secondZk.secure) {
+        firstZk.createRecursive(ConsumerPathZNode.path)
         "secure"
       } else {
-        secondZk.createPersistentPath(ZkUtils.ConsumersPath)
+        secondZk.createRecursive(ConsumerPathZNode.path)
         "unsecure"
       }
     ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl"))
     info("Done with migration")
-    for (path <- ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths) {
-      val sensitive = ZkUtils.sensitivePath(path)
-      val listParent = secondZk.zkConnection.getAcl(path).getKey
-      assertTrue(path, isAclCorrect(listParent, secondZk.isSecure, sensitive))
+    for (path <- ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths) {
+      val sensitive = ZkData.sensitivePath(path)
+      val listParent = secondZk.getAcl(path)
+      assertTrue(path, isAclCorrect(listParent, secondZk.secure, sensitive))
 
       val childPath = path + "/fpjwashere"
-      val listChild = secondZk.zkConnection.getAcl(childPath).getKey
-      assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure, sensitive))
+      val listChild = secondZk.getAcl(childPath)
+      assertTrue(childPath, isAclCorrect(listChild, secondZk.secure, sensitive))
     }
     // Check consumers path.
-    val consumersAcl = firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath).getKey
-    assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false, false))
+    val consumersAcl = firstZk.getAcl(ConsumerPathZNode.path)
+    assertTrue(ConsumerPathZNode.path, isAclCorrect(consumersAcl, false, false))
   }
 
   /**
    * Verifies that the path has the appropriate secure ACL.
    */
-  private def verify(path: String): Boolean = {
-    val sensitive = ZkUtils.sensitivePath(path)
-    val list = zkUtils.zkConnection.getAcl(path).getKey
-    list.asScala.forall(TestUtils.isAclSecure(_, sensitive))
+  private def verify(path: String): Unit = {
+    val sensitive = ZkData.sensitivePath(path)
+    val list = zkClient.getAcl(path)
+    assertTrue(list.forall(TestUtils.isAclSecure(_, sensitive)))
   }
 
   /**
    * Verifies ACL.
    */
-  private def isAclCorrect(list: java.util.List[ACL], secure: Boolean, sensitive: Boolean): Boolean = {
+  private def isAclCorrect(list: Seq[ACL], secure: Boolean, sensitive: Boolean): Boolean = {
     val isListSizeCorrect =
       if (secure && !sensitive)
         list.size == 2
       else
         list.size == 1
-    isListSizeCorrect && list.asScala.forall(
+    isListSizeCorrect && list.forall(
       if (secure)
         TestUtils.isAclSecure(_, sensitive)
       else
@@ -260,14 +283,14 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    * This is used in the testDelete and testDeleteRecursive
    * test cases.
    */
-  private def deleteAllUnsecure() {
+  private def deleteAllUnsecure(): Unit = {
     System.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")
-    val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false)
+    val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
     val result: Try[Boolean] = {
-      deleteRecursive(unsecureZkUtils, "/")
+      deleteRecursive(unsecureZkClient, "/")
     }
     // Clean up before leaving the test case
-    unsecureZkUtils.close()
+    unsecureZkClient.close()
     System.clearProperty(JaasUtils.ZK_SASL_CLIENT)
     
     // Fail the test if able to delete
@@ -280,13 +303,13 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
   /**
    * Tries to delete znodes recursively
    */
-  private def deleteRecursive(zkUtils: ZkUtils, path: String): Try[Boolean] = {
+  private def deleteRecursive(zkClient: KafkaZkClient, path: String): Try[Boolean] = {
     info(s"Deleting $path")
     var result: Try[Boolean] = Success(true)
-    for (child <- zkUtils.getChildren(path))
+    for (child <- zkClient.getChildren(path))
       result = (path match {
-        case "/" => deleteRecursive(zkUtils, s"/$child")
-        case path => deleteRecursive(zkUtils, s"$path/$child")
+        case "/" => deleteRecursive(zkClient, s"/$child")
+        case path => deleteRecursive(zkClient, s"$path/$child")
       }) match {
         case Success(_) => result
         case Failure(e) => Failure(e)
@@ -297,7 +320,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
       // For all other paths, try to delete it
       case path =>
         try {
-          zkUtils.deletePath(path)
+          zkClient.deletePath(path, recursiveDelete = false)
           Failure(new Exception(s"Have been able to delete $path"))
         } catch {
           case _: Exception => result
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c3e7312..b5a7583 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,8 +26,8 @@ import java.security.cert.X509Certificate
 import java.time.Duration
 import java.util.{Collections, Properties}
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-
 import javax.net.ssl.X509TrustManager
+
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log._
@@ -1132,30 +1132,30 @@ object TestUtils extends Logging {
     }
   }
 
-  private def secureZkPaths(zkUtils: ZkUtils): Seq[String] = {
+  private def secureZkPaths(zkClient: KafkaZkClient): Seq[String] = {
     def subPaths(path: String): Seq[String] = {
-      if (zkUtils.pathExists(path))
-        path +: zkUtils.getChildren(path).map(c => path + "/" + c).flatMap(subPaths)
+      if (zkClient.pathExists(path))
+        path +: zkClient.getChildren(path).map(c => path + "/" + c).flatMap(subPaths)
       else
         Seq.empty
     }
-    val topLevelPaths = ZkUtils.SecureZkRootPaths ++ ZkUtils.SensitiveZkRootPaths
+    val topLevelPaths = ZkData.SecureRootPaths ++ ZkData.SensitiveRootPaths
     topLevelPaths.flatMap(subPaths)
   }
 
   /**
    * Verifies that all secure paths in ZK are created with the expected ACL.
    */
-  def verifySecureZkAcls(zkUtils: ZkUtils, usersWithAccess: Int) {
-    secureZkPaths(zkUtils).foreach(path => {
-      if (zkUtils.pathExists(path)) {
-        val sensitive = ZkUtils.sensitivePath(path)
+  def verifySecureZkAcls(zkClient: KafkaZkClient, usersWithAccess: Int) {
+    secureZkPaths(zkClient).foreach(path => {
+      if (zkClient.pathExists(path)) {
+        val sensitive = ZkData.sensitivePath(path)
         // usersWithAccess have ALL access to path. For paths that are
         // not sensitive, world has READ access.
         val aclCount = if (sensitive) usersWithAccess else usersWithAccess + 1
-        val acls = zkUtils.zkConnection.getAcl(path).getKey
+        val acls = zkClient.getAcl(path)
         assertEquals(s"Invalid ACLs for $path $acls", aclCount, acls.size)
-        acls.asScala.foreach(acl => isAclSecure(acl, sensitive))
+        acls.foreach(acl => isAclSecure(acl, sensitive))
       }
     })
   }
@@ -1164,12 +1164,12 @@ object TestUtils extends Logging {
    * Verifies that secure paths in ZK have no access control. This is
    * the case when zookeeper.set.acl=false and no ACLs have been configured.
    */
-  def verifyUnsecureZkAcls(zkUtils: ZkUtils) {
-    secureZkPaths(zkUtils).foreach(path => {
-      if (zkUtils.pathExists(path)) {
-        val acls = zkUtils.zkConnection.getAcl(path).getKey
+  def verifyUnsecureZkAcls(zkClient: KafkaZkClient) {
+    secureZkPaths(zkClient).foreach(path => {
+      if (zkClient.pathExists(path)) {
+        val acls = zkClient.getAcl(path)
         assertEquals(s"Invalid ACLs for $path $acls", 1, acls.size)
-        acls.asScala.foreach(isAclUnsecure)
+        acls.foreach(isAclUnsecure)
       }
     })
   }
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 6de9159..a8df342 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -44,6 +44,7 @@ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zookeeper._
 import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.ZooDefs
 import org.apache.zookeeper.data.Stat
 
 class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -543,6 +544,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.createRecursive(path)
     zkClient.deletePath(path)
     assertFalse(zkClient.pathExists(path))
+
+    zkClient.createRecursive(path)
+    zkClient.deletePath("/a")
+    assertFalse(zkClient.pathExists(path))
+
+    zkClient.createRecursive(path)
+    zkClient.deletePath(path, recursiveDelete =  false)
+    assertFalse(zkClient.pathExists(path))
+    assertTrue(zkClient.pathExists("/a/b"))
   }
 
   @Test
@@ -1143,6 +1153,25 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(expectedConsumerGroupOffsetsPath, actualConsumerGroupOffsetsPath)
   }
 
+  @Test
+  def testAclMethods(): Unit = {
+    val mockPath = "/foo"
+
+    intercept[NoNodeException] {
+      zkClient.getAcl(mockPath)
+    }
+
+    intercept[NoNodeException] {
+      zkClient.setAcl(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala)
+    }
+
+    zkClient.createRecursive(mockPath)
+
+    zkClient.setAcl(mockPath, ZooDefs.Ids.READ_ACL_UNSAFE.asScala)
+
+    assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, zkClient.getAcl(mockPath))
+  }
+
   class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time)
     extends KafkaZkClient(zooKeeperClient, isSecure, time) {
     // Overwriting this method from the parent class to force the client to re-register the Broker.
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 8d34c48..2f75fa2 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -43,7 +43,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   val zkSessionTimeout = 6000
   val zkMaxInFlightRequests = Int.MaxValue
 
-  protected val zkAclsEnabled: Option[Boolean] = None
+  protected def zkAclsEnabled: Option[Boolean] = None
 
   var zkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null