You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/11/08 17:34:46 UTC

kafka git commit: KAFKA-5645; Use async ZookeeperClient in SimpleAclAuthorizer

Repository: kafka
Updated Branches:
  refs/heads/trunk 3735a6ca8 -> 9b44c3e7f


KAFKA-5645; Use async ZookeeperClient in SimpleAclAuthorizer

Author: Manikumar Reddy <ma...@gmail.com>

Reviewers: Onur Karaman <ok...@linkedin.com>, Jun Rao <ju...@gmail.com>

Closes #4155 from omkreddy/KAFKA-5645-ZK-AUTHORIZER-VERSION2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b44c3e7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b44c3e7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b44c3e7

Branch: refs/heads/trunk
Commit: 9b44c3e7f89b55528b12ba280007c804682beb0b
Parents: 3735a6c
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Wed Nov 8 09:34:43 2017 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 8 09:34:43 2017 -0800

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      | 109 ++++----
 .../kafka/controller/KafkaController.scala      |  10 +-
 .../main/scala/kafka/security/auth/Acl.scala    |  11 +-
 .../security/auth/SimpleAclAuthorizer.scala     |  98 ++-----
 .../kafka/server/DynamicConfigManager.scala     |  20 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  22 +-
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 259 ++++++++++++++++++-
 core/src/main/scala/kafka/zk/ZkData.scala       |  44 ++++
 .../scala/kafka/zookeeper/ZooKeeperClient.scala |  41 ++-
 .../ZkNodeChangeNotificationListenerTest.scala  |  21 +-
 .../unit/kafka/security/auth/AclTest.scala      |   8 +-
 .../security/auth/SimpleAclAuthorizerTest.scala |   6 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |   2 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 137 ++++++++--
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  10 +
 .../kafka/zookeeper/ZooKeeperClientTest.scala   |  36 +--
 16 files changed, 593 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 0e34c5a..f589430 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -16,12 +16,12 @@
  */
 package kafka.common
 
+import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.utils.{Logging, ZkUtils}
-import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.exception.ZkInterruptedException
-import org.I0Itec.zkclient.{IZkChildListener, IZkStateListener}
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{KafkaZkClient, StateChangeHandlers}
+import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler}
 import org.apache.kafka.common.utils.Time
 
 /**
@@ -38,60 +38,53 @@ trait NotificationHandler {
  * notificationHandler's processNotification() method with the child's data as argument. As part of processing these changes it also
  * purges any children with currentTime - createTime > changeExpirationMs.
  *
- * The caller/user of this class should ensure that they use zkUtils.subscribeStateChanges and call processAllNotifications
- * method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary to ensure that if zk session
- * is terminated and reestablished any missed notification will be processed immediately.
- * @param zkUtils
+ * @param zkClient
  * @param seqNodeRoot
  * @param seqNodePrefix
  * @param notificationHandler
  * @param changeExpirationMs
  * @param time
  */
-class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
+class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
                                        private val seqNodeRoot: String,
                                        private val seqNodePrefix: String,
                                        private val notificationHandler: NotificationHandler,
                                        private val changeExpirationMs: Long = 15 * 60 * 1000,
                                        private val time: Time = Time.SYSTEM) extends Logging {
   private var lastExecutedChange = -1L
+  private val queue = new LinkedBlockingQueue[ChangeNotification]
+  private val thread = new ChangeEventProcessThread(s"$seqNodeRoot-event-process-thread")
   private val isClosed = new AtomicBoolean(false)
 
-  /**
-   * create seqNodeRoot and begin watching for any new children nodes.
-   */
   def init() {
-    zkUtils.makeSurePersistentPathExists(seqNodeRoot)
-    zkUtils.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
-    zkUtils.subscribeStateChanges(ZkStateChangeListener)
-    processAllNotifications()
+    zkClient.registerStateChangeHandler(ZkStateChangeHandler)
+    zkClient.registerZNodeChildChangeHandler(ChangeNotificationHandler)
+    addChangeNotification()
+    thread.start()
   }
 
   def close() = {
     isClosed.set(true)
+    zkClient.unregisterStateChangeHandler(ZkStateChangeHandler.name)
+    zkClient.unregisterZNodeChildChangeHandler(ChangeNotificationHandler.path)
+    queue.clear()
+    thread.shutdown()
   }
 
   /**
-   * Process all changes
-   */
-  def processAllNotifications() {
-    val changes = zkUtils.getChildren(seqNodeRoot)
-    processNotifications(changes.sorted)
-  }
-
-  /**
-   * Process the given list of notifications
+   * Process notifications
    */
-  private def processNotifications(notifications: Seq[String]) {
-    if (notifications.nonEmpty) {
-      info(s"Processing notification(s) to $seqNodeRoot")
-      try {
+  private def processNotifications() {
+    try {
+      val notifications = zkClient.getChildren(seqNodeRoot).sorted
+      if (notifications.nonEmpty) {
+        info(s"Processing notification(s) to $seqNodeRoot")
         val now = time.milliseconds
         for (notification <- notifications) {
           val changeId = changeNumber(notification)
           if (changeId > lastExecutedChange) {
             val changeZnode = seqNodeRoot + "/" + notification
-            val data = zkUtils.readDataMaybeNull(changeZnode)._1.orNull
+            val data = zkClient.getDataAndStat(changeZnode)._1.orNull
             if (data != null) {
               notificationHandler.processNotification(data)
             } else {
@@ -101,14 +94,22 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
           }
         }
         purgeObsoleteNotifications(now, notifications)
-      } catch {
-        case e: ZkInterruptedException =>
-          if (!isClosed.get)
-            throw e
       }
+    } catch {
+      case e: InterruptedException => if (!isClosed.get) error(s"Error while processing notification change for path = $seqNodeRoot", e)
+      case e: Exception => error(s"Error while processing notification change for path = $seqNodeRoot", e)
     }
   }
 
+  private def addChangeNotification(): Unit = {
+    if (!isClosed.get && queue.peek() == null)
+      queue.put(new ChangeNotification)
+  }
+
+  class ChangeNotification {
+    def process(): Unit = processNotifications
+  }
+
   /**
    * Purges expired notifications.
    *
@@ -118,11 +119,11 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
   private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
     for (notification <- notifications.sorted) {
       val notificationNode = seqNodeRoot + "/" + notification
-      val (data, stat) = zkUtils.readDataMaybeNull(notificationNode)
+      val (data, stat) = zkClient.getDataAndStat(notificationNode)
       if (data.isDefined) {
         if (now - stat.getCtime > changeExpirationMs) {
           debug(s"Purging change notification $notificationNode")
-          zkUtils.deletePath(notificationNode)
+          zkClient.deletePath(notificationNode)
         }
       }
     }
@@ -131,35 +132,19 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
   /* get the change number from a change notification znode */
   private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong
 
-  /**
-   * A listener that gets invoked when a node is created to notify changes.
-   */
-  object NodeChangeListener extends IZkChildListener {
-    override def handleChildChange(path: String, notifications: java.util.List[String]) {
-      try {
-        import scala.collection.JavaConverters._
-        if (notifications != null)
-          processNotifications(notifications.asScala.sorted)
-      } catch {
-        case e: Exception => error(s"Error processing notification change for path = $path and notification= $notifications :", e)
-      }
-    }
+  class ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = queue.take().process
   }
 
-  object ZkStateChangeListener extends IZkStateListener {
-
-    override def handleNewSession() {
-      processAllNotifications
-    }
-
-    override def handleSessionEstablishmentError(error: Throwable) {
-      fatal("Could not establish session with ZooKeeper", error)
-    }
-
-    override def handleStateChanged(state: KeeperState) {
-      debug(s"New ZooKeeper state: ${state}")
-    }
+  object ChangeNotificationHandler extends ZNodeChildChangeHandler {
+    override val path: String = seqNodeRoot
+    override def handleChildChange(): Unit = addChangeNotification
   }
 
+  object ZkStateChangeHandler extends  StateChangeHandler {
+    override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot)
+    override def afterInitializingSession(): Unit = addChangeNotification
+    override def onReconnectionTimeout(): Unit = error("Reconnection timeout.")
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ade3ae4..aa5cf1f 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -25,9 +25,9 @@ import kafka.common._
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server._
 import kafka.utils._
-import kafka.zk._
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zookeeper.{ZNodeChangeHandler, ZNodeChildChangeHandler}
+import kafka.zk._
+import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.metrics.Metrics
@@ -518,6 +518,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * elector
    */
   def startup() = {
+    zkClient.registerStateChangeHandler(new StateChangeHandler {
+      override val name: String = StateChangeHandlers.ControllerHandler
+      override def onReconnectionTimeout(): Unit = error("Reconnection timeout.")
+      override def afterInitializingSession(): Unit = newSession()
+      override def beforeInitializingSession(): Unit = expire()
+    })
     eventManager.put(Startup)
     eventManager.start()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/main/scala/kafka/security/auth/Acl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
index 1fbcfb1..4e2cba4 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -19,6 +19,7 @@ package kafka.security.auth
 
 import kafka.utils.Json
 import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.SecurityUtils
 
 object Acl {
   val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
@@ -34,7 +35,7 @@ object Acl {
 
   /**
    *
-   * @param aclJson
+   * @param bytes of acls json string
    *
    * <p>
       {
@@ -52,15 +53,15 @@ object Acl {
    *
    * @return
    */
-  def fromJson(aclJson: String): Set[Acl] = {
-    if (aclJson == null || aclJson.isEmpty)
+  def fromBytes(bytes: Array[Byte]): Set[Acl] = {
+    if (bytes == null || bytes.isEmpty)
       return collection.immutable.Set.empty[Acl]
 
-    Json.parseFull(aclJson).map(_.asJsonObject).map { js =>
+    Json.parseBytes(bytes).map(_.asJsonObject).map { js =>
       //the acl json version.
       require(js(VersionKey).to[Int] == CurrentVersion)
       js(AclsKey).asJsonArray.iterator.map(_.asJsonObject).map { itemJs =>
-        val principal = KafkaPrincipal.fromString(itemJs(PrincipalKey).to[String])
+        val principal = SecurityUtils.parseKafkaPrincipal(itemJs(PrincipalKey).to[String])
         val permissionType = PermissionType.fromString(itemJs(PermissionTypeKey).to[String])
         val host = itemJs(HostsKey).to[String]
         val operation = Operation.fromString(itemJs(OperationKey).to[String])

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 3c949648..aa25653 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -18,18 +18,20 @@ package kafka.security.auth
 
 import java.util
 import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 
+import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
-import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
+import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import scala.collection.JavaConverters._
+import org.apache.kafka.common.utils.SecurityUtils
 import org.apache.log4j.Logger
 
+import scala.collection.JavaConverters._
 import scala.util.Random
 
 object SimpleAclAuthorizer {
@@ -38,39 +40,21 @@ object SimpleAclAuthorizer {
   val ZkUrlProp = "authorizer.zookeeper.url"
   val ZkConnectionTimeOutProp = "authorizer.zookeeper.connection.timeout.ms"
   val ZkSessionTimeOutProp = "authorizer.zookeeper.session.timeout.ms"
+  val ZkMaxInFlightRequests = "authorizer.zookeeper.max.in.flight.requests"
 
   //List of users that will be treated as super users and will have access to all the resources for all actions from all hosts, defaults to no super users.
   val SuperUsersProp = "super.users"
   //If set to true when no acls are found for a resource , authorizer allows access to everyone. Defaults to false.
   val AllowEveryoneIfNoAclIsFoundProp = "allow.everyone.if.no.acl.found"
 
-  /**
-   * The root acl storage node. Under this node there will be one child node per resource type (Topic, Cluster, Group).
-   * under each resourceType there will be a unique child for each resource instance and the data for that child will contain
-   * list of its acls as a json object. Following gives an example:
-   *
-   * <pre>
-   * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
-   * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
-   * /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
-   * </pre>
-   */
-  val AclZkPath = ZkUtils.KafkaAclPath
-
-  //notification node which gets updated with the resource name when acl on a resource is changed.
-  val AclChangedZkPath = ZkUtils.KafkaAclChangesPath
-
-  //prefix of all the change notification sequence node.
-  val AclChangedPrefix = "acl_changes_"
-
-  private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
+  case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
 }
 
 class SimpleAclAuthorizer extends Authorizer with Logging {
   private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger")
   private var superUsers = Set.empty[KafkaPrincipal]
   private var shouldAllowEveryoneIfNoAclIsFound = false
-  private var zkUtils: ZkUtils = null
+  private var zkClient: KafkaZkClient = null
   private var aclChangeListener: ZkNodeChangeNotificationListener = null
 
   private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls]
@@ -92,7 +76,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     configs.foreach { case (key, value) => props.put(key, value.toString) }
 
     superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
-      case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
+      case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
     }.getOrElse(Set.empty[KafkaPrincipal])
 
     shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
@@ -104,17 +88,16 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
     val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
     val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
+    val zkMaxInFlightRequests = configs.get(SimpleAclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
 
-    zkUtils = ZkUtils(zkUrl,
-                      sessionTimeout = zkSessionTimeOutMs,
-                      connectionTimeout = zkConnectionTimeoutMs,
-                      kafkaConfig.zkEnableSecureAcls)
-    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
+    val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs, zkConnectionTimeoutMs, zkMaxInFlightRequests)
+
+    zkClient = new KafkaZkClient(zooKeeperClient, kafkaConfig.zkEnableSecureAcls)
+    zkClient.createAclPaths()
 
     loadCache()
 
-    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
-    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path, AclChangeNotificationSequenceZNode.SequenceNumberPrefix, AclChangedNotificationHandler)
     aclChangeListener.init()
   }
 
@@ -192,7 +175,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   override def removeAcls(resource: Resource): Boolean = {
     inWriteLock(lock) {
-      val result = zkUtils.deletePath(toResourcePath(resource))
+      val result = zkClient.deleteResource(resource)
       updateCache(resource, VersionedAcls(Set(), 0))
       updateAclChangedFlag(resource)
       result
@@ -223,28 +206,23 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   def close() {
     if (aclChangeListener != null) aclChangeListener.close()
-    if (zkUtils != null) zkUtils.close()
+    if (zkClient != null) zkClient.close()
   }
 
   private def loadCache()  {
     inWriteLock(lock) {
-      val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
+      val resourceTypes = zkClient.getResourceTypes()
       for (rType <- resourceTypes) {
         val resourceType = ResourceType.fromString(rType)
-        val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
-        val resourceNames = zkUtils.getChildren(resourceTypePath)
+        val resourceNames = zkClient.getResourceNames(resourceType.name)
         for (resourceName <- resourceNames) {
-          val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString))
+          val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName))
           updateCache(new Resource(resourceType, resourceName), versionedAcls)
         }
       }
     }
   }
 
-  def toResourcePath(resource: Resource): String = {
-    SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + resource.name
-  }
-
   private def logAuditMessage(principal: KafkaPrincipal, authorized: Boolean, operation: Operation, resource: Resource, host: String) {
     def logMessage: String = {
       val authResult = if (authorized) "Allowed" else "Denied"
@@ -266,8 +244,6 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     * @return boolean indicating if a change was made
     */
   private def updateResourceAcls(resource: Resource)(getNewAcls: Set[Acl] => Set[Acl]): Boolean = {
-    val path = toResourcePath(resource)
-
     var currentVersionedAcls =
       if (aclCache.contains(resource))
         getAclsFromCache(resource)
@@ -278,13 +254,12 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     var retries = 0
     while (!writeComplete && retries <= maxUpdateRetries) {
       val newAcls = getNewAcls(currentVersionedAcls.acls)
-      val data = Json.encode(Acl.toJsonCompatibleMap(newAcls))
       val (updateSucceeded, updateVersion) =
         if (newAcls.nonEmpty) {
-         updatePath(path, data, currentVersionedAcls.zkVersion)
+          zkClient.conditionalSetOrCreateAclsForResource(resource, newAcls, currentVersionedAcls.zkVersion)
         } else {
           trace(s"Deleting path for $resource because it had no ACLs remaining")
-          (zkUtils.conditionalDeletePath(path, currentVersionedAcls.zkVersion), 0)
+          (zkClient.conditionalDelete(resource, currentVersionedAcls.zkVersion), 0)
         }
 
       if (!updateSucceeded) {
@@ -313,34 +288,12 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     }
   }
 
-  /**
-    * Updates a zookeeper path with an expected version. If the topic does not exist, it will create it.
-    * Returns if the update was successful and the new version.
-    */
-  private def updatePath(path: String, data: String, expectedVersion: Int): (Boolean, Int) = {
-    try {
-      zkUtils.conditionalUpdatePersistentPathIfExists(path, data, expectedVersion)
-    } catch {
-      case _: ZkNoNodeException =>
-        try {
-          debug(s"Node $path does not exist, attempting to create it.")
-          zkUtils.createPersistentPath(path, data)
-          (true, 0)
-        } catch {
-          case _: ZkNodeExistsException =>
-            debug(s"Failed to create node for $path because it already exists.")
-            (false, 0)
-        }
-    }
-  }
-
   private def getAclsFromCache(resource: Resource): VersionedAcls = {
     aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
   }
 
   private def getAclsFromZk(resource: Resource): VersionedAcls = {
-    val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
-    VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
+    zkClient.getVersionedAclsForResource(resource)
   }
 
   private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {
@@ -352,7 +305,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   private def updateAclChangedFlag(resource: Resource) {
-    zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
+    zkClient.createAclChangeNotification(resource.toString)
   }
 
   private def backoffTime = {
@@ -368,4 +321,5 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       }
     }
   }
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 634b0c2..0f1abfc 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -26,6 +26,7 @@ import scala.collection._
 import scala.collection.JavaConverters._
 import kafka.admin.AdminUtils
 import kafka.utils.json.JsonObject
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.security.scram.ScramMechanism
 import org.apache.kafka.common.utils.Time
@@ -83,7 +84,8 @@ object ConfigEntityName {
  * on startup where a change might be missed between the initial config load and registering for change notifications.
  *
  */
-class DynamicConfigManager(private val zkUtils: ZkUtils,
+class DynamicConfigManager(private val oldZkUtils: ZkUtils,
+                           private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, ConfigHandler],
                            private val changeExpirationMs: Long = 15*60*1000,
                            private val time: Time = Time.SYSTEM) extends Logging {
@@ -118,7 +120,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
         throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
       }
 
-      val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)
+      val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, entityType, entity)
       logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
       configHandlers(entityType).processConfigChanges(entity, entityConfig)
 
@@ -139,7 +141,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
       }
       val fullSanitizedEntityName = entityPath.substring(index + 1)
 
-      val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, rootEntityType, fullSanitizedEntityName)
+      val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, rootEntityType, fullSanitizedEntityName)
       val loggableConfig = entityConfig.asScala.map {
         case (k, v) => (k, if (ScramMechanism.isScram(k)) Password.HIDDEN else v)
       }
@@ -149,7 +151,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
     }
   }
 
-  private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.ConfigChangesPath,
+  private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ZkUtils.ConfigChangesPath,
     AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
 
   /**
@@ -161,16 +163,20 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
     // Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides
     configHandlers.foreach {
       case (ConfigType.User, handler) =>
-        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.User).foreach {
+        AdminUtils.fetchAllEntityConfigs(oldZkUtils, ConfigType.User).foreach {
           case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties)
         }
-        AdminUtils.fetchAllChildEntityConfigs(zkUtils, ConfigType.User, ConfigType.Client).foreach {
+        AdminUtils.fetchAllChildEntityConfigs(oldZkUtils, ConfigType.User, ConfigType.Client).foreach {
           case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties)
         }
       case (configType, handler) =>
-        AdminUtils.fetchAllEntityConfigs(zkUtils, configType).foreach {
+        AdminUtils.fetchAllEntityConfigs(oldZkUtils, configType).foreach {
           case (entityName, properties) => handler.processConfigChanges(entityName, properties)
         }
     }
   }
+
+  def shutdown(): Unit = {
+    configChangeListener.close()
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index dff83db..df36662 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -37,7 +37,7 @@ import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import kafka.zk.KafkaZkClient
-import kafka.zookeeper.{StateChangeHandler, ZooKeeperClient}
+import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
@@ -221,20 +221,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
-        val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
-          config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, new StateChangeHandler {
-            override def onReconnectionTimeout(): Unit = {
-              error("Reconnection timeout.")
-            }
-
-            override def afterInitializingSession(): Unit = kafkaController.newSession()
-
-            override def onAuthFailure(): Unit = {
-              error("Auth failure.")
-            }
-
-            override def beforeInitializingSession(): Unit = kafkaController.expire()
-          })
+        val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests)
         zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure)
 
         /* start log manager */
@@ -291,7 +278,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
                                                            ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
         // Create the config manager. start listening to notifications
-        dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
+        dynamicConfigManager = new DynamicConfigManager(zkUtils, zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
         /* tell everyone we are alive */
@@ -536,6 +523,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         if (kafkaHealthcheck != null)
           CoreUtils.swallow(kafkaHealthcheck.shutdown())
 
+        if (dynamicConfigManager != null)
+          CoreUtils.swallow(dynamicConfigManager.shutdown())
+
         if (socketServer != null)
           CoreUtils.swallow(socketServer.shutdown())
         if (requestHandlerPool != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 3267a74..97b7c98 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -23,6 +23,8 @@ import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
+import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.ConfigType
 import kafka.utils._
 import kafka.zookeeper._
@@ -348,6 +350,47 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Gets the data and Stat at the given zk path
+   * @param path zk node path
+   * @return A tuple of 2 elements, where first element is zk node data as string
+   *         and second element is zk node stats.
+   *         returns (None, new Stat()) if node doesn't exists and throws exception for any error
+   */
+  def getDataAndStat(path: String): (Option[String], Stat) = {
+    val getDataRequest = GetDataRequest(path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+
+    if (getDataResponse.resultCode == Code.OK) {
+      if (getDataResponse.data == null)
+        (None, getDataResponse.stat)
+      else {
+        val data = new String(getDataResponse.data, UTF_8)
+        (Some(data), getDataResponse.stat)
+      }
+    } else if (getDataResponse.resultCode  == Code.NONODE) {
+      (None, new Stat())
+    } else {
+      throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Gets all the child nodes at a given zk node path
+   * @param path
+   * @return list of child node names
+   */
+  def getChildren(path : String): Seq[String] = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      getChildrenResponse.children
+    } else if (getChildrenResponse.resultCode == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+
+  /**
    * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
    * exist, the current version is not the expected version, etc.) return (false, -1)
    *
@@ -528,7 +571,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   def deleteIsrChangeNotifications(): Unit = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
     if (getChildrenResponse.resultCode == Code.OK) {
-      deleteIsrChangeNotifications(getChildrenResponse.children)
+      deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber))
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
       throw getChildrenResponse.resultException.get
     }
@@ -627,6 +670,183 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     retryRequestsUntilConnected(deleteRequests)
   }
 
+  //Acl management methods
+
+  /**
+   * Creates the required zk nodes for Acl storage
+   */
+  def createAclPaths(): Unit = {
+    createRecursive(AclZNode.path)
+    createRecursive(AclChangeNotificationZNode.path)
+    ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name)))
+  }
+
+  /**
+   * Gets VersionedAcls for a given Resource
+   * @param resource Resource to get VersionedAcls for
+   * @return  VersionedAcls
+   */
+  def getVersionedAclsForResource(resource: Resource): VersionedAcls = {
+    val getDataRequest = GetDataRequest(ResourceZNode.path(resource))
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    if (getDataResponse.resultCode == Code.OK) {
+      ResourceZNode.decode(getDataResponse.data, getDataResponse.stat)
+    } else if (getDataResponse.resultCode == Code.NONODE) {
+      VersionedAcls(Set(), -1)
+    } else {
+      throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Sets or creates the resource znode path with the given acls and expected zk version depending
+   * on whether it already exists or not.
+   * @param resource
+   * @param aclsSet
+   * @param expectedVersion
+   * @return true if the update was successful and the new version
+   */
+  def conditionalSetOrCreateAclsForResource(resource: Resource, aclsSet: Set[Acl], expectedVersion: Int): (Boolean, Int) = {
+    def set(aclData: Array[Byte],  expectedVersion: Int): SetDataResponse = {
+      val setDataRequest = SetDataRequest(ResourceZNode.path(resource), aclData, expectedVersion)
+      retryRequestUntilConnected(setDataRequest)
+    }
+
+    def create(aclData: Array[Byte]): CreateResponse = {
+      val path = ResourceZNode.path(resource)
+      val createRequest = CreateRequest(path, aclData, acls(path), CreateMode.PERSISTENT)
+      retryRequestUntilConnected(createRequest)
+    }
+
+    val aclData = ResourceZNode.encode(aclsSet)
+
+    val setDataResponse = set(aclData, expectedVersion)
+    setDataResponse.resultCode match {
+      case Code.OK =>
+        (true, setDataResponse.stat.getVersion)
+      case Code.NONODE => {
+        val createResponse = create(aclData)
+        createResponse.resultCode match {
+          case Code.OK =>
+            (true, 0)
+          case Code.NODEEXISTS =>
+            (false, 0)
+          case _ =>
+            throw createResponse.resultException.get
+        }
+      }
+      case Code.BADVERSION =>
+        (false, 0)
+      case _ =>
+        throw setDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Creates Acl change notification message
+   * @param resourceName resource name
+   */
+  def createAclChangeNotification(resourceName: String): Unit = {
+    val path = AclChangeNotificationSequenceZNode.createPath
+    val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resourceName), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createResponse = retryRequestUntilConnected(createRequest)
+    if (createResponse.resultCode != Code.OK) {
+      throw createResponse.resultException.get
+    }
+  }
+
+  /**
+   * Deletes all Acl change notifications.
+   * @throws KeeperException if there is an error while deleting Acl change notifications
+   */
+  def deleteAclChangeNotifications(): Unit = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(AclChangeNotificationZNode.path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      deleteAclChangeNotifications(getChildrenResponse.children)
+    } else if (getChildrenResponse.resultCode != Code.NONODE) {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+
+  /**
+   * Deletes the Acl change notifications associated with the given sequence nodes
+   * @param sequenceNodes
+   */
+  private def deleteAclChangeNotifications(sequenceNodes: Seq[String]): Unit = {
+    val deleteRequests = sequenceNodes.map { sequenceNode =>
+      DeleteRequest(AclChangeNotificationSequenceZNode.deletePath(sequenceNode), ZkVersion.NoVersion)
+    }
+
+    val deleteResponses = retryRequestsUntilConnected(deleteRequests)
+    deleteResponses.foreach { deleteResponse =>
+      if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
+        throw deleteResponse.resultException.get
+      }
+    }
+  }
+
+  /**
+   * Gets the resource types
+   * @return list of resource type names
+   */
+  def getResourceTypes(): Seq[String] = {
+    getChildren(AclZNode.path)
+  }
+
+  /**
+   * Gets the resource names for a give resource type
+   * @param resourceType
+   * @return list of resource names
+   */
+  def getResourceNames(resourceType: String): Seq[String] = {
+    getChildren(ResourceTypeZNode.path(resourceType))
+  }
+
+  /**
+   * Deletes the given Resource node
+   * @param resource
+   * @return delete status
+   */
+  def deleteResource(resource: Resource): Boolean = {
+    deleteRecursive(ResourceZNode.path(resource))
+  }
+
+  /**
+   * checks the resource existence
+   * @param resource
+   * @return existence status
+   */
+  def resourceExists(resource: Resource): Boolean = {
+    pathExists(ResourceZNode.path(resource))
+  }
+
+  /**
+   * Conditional delete the resource node
+   * @param resource
+   * @param expectedVersion
+   * @return return true if it succeeds, false otherwise (the current version is not the expected version)
+   */
+  def conditionalDelete(resource: Resource, expectedVersion: Int): Boolean = {
+    val deleteRequest = DeleteRequest(ResourceZNode.path(resource), expectedVersion)
+    val deleteResponse = retryRequestUntilConnected(deleteRequest)
+    if (deleteResponse.resultCode == Code.OK || deleteResponse.resultCode == Code.NONODE) {
+      true
+    } else if (deleteResponse.resultCode == Code.BADVERSION) {
+      false
+    } else {
+      throw deleteResponse.resultException.get
+    }
+  }
+  
+  /**
+   * Deletes the zk node recursively
+   * @param path
+   * @return  return true if it succeeds, false otherwise
+   */
+  def deletePath(path: String): Boolean = {
+    deleteRecursive(path)
+  }
+
   /**
    * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data
    * watcher registrations on paths which might not even exist.
@@ -678,6 +898,22 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   *
+   * @param stateChangeHandler
+   */
+  def registerStateChangeHandler(stateChangeHandler: StateChangeHandler): Unit = {
+    zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
+  }
+
+  /**
+   *
+   * @param name
+   */
+  def unregisterStateChangeHandler(name: String): Unit = {
+    zooKeeperClient.unregisterStateChangeHandler(name)
+  }
+
+  /**
    * Close the underlying ZooKeeperClient.
    */
   def close(): Unit = {
@@ -738,7 +974,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     createResponse
   }
 
-  private[zk] def deleteRecursive(path: String): Unit = {
+  /**
+   * Deletes the given zk path recursively
+   * @param path
+   * @return true if path gets deleted successfully, false if root path doesn't exists
+   * @throws KeeperException if there is an error while deleting the znodes
+   */
+  private[zk] def deleteRecursive(path: String): Boolean = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
     if (getChildrenResponse.resultCode == Code.OK) {
       getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
@@ -746,15 +988,22 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
       if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
         throw deleteResponse.resultException.get
       }
-    } else if (getChildrenResponse.resultCode != Code.NONODE) {
+      true
+    } else if (getChildrenResponse.resultCode == Code.NONODE) {
+      false
+    } else
       throw getChildrenResponse.resultException.get
-    }
   }
 
   private[zk] def pathExists(path: String): Boolean = {
     val getDataRequest = GetDataRequest(path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    getDataResponse.resultCode == Code.OK
+    if (getDataResponse.resultCode == Code.OK) {
+      true
+    } else if (getDataResponse.resultCode == Code.NONODE) {
+      false
+    } else
+      throw getDataResponse.resultException.get
   }
 
   private[zk] def createRecursive(path: String): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index a1ff559..400f0c7 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -22,6 +22,8 @@ import java.util.Properties
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
+import kafka.security.auth.{Acl, Resource}
+import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.data.Stat
@@ -252,3 +254,45 @@ object ConsumerOffset {
 object ZkVersion {
   val NoVersion = -1
 }
+
+object StateChangeHandlers {
+  val ControllerHandler = "controller-state-change-handler"
+  def zkNodeChangeListenerHandler(seqNodeRoot: String) = s"change-notification-$seqNodeRoot"
+}
+
+/**
+ * The root acl storage node. Under this node there will be one child node per resource type (Topic, Cluster, Group).
+ * under each resourceType there will be a unique child for each resource instance and the data for that child will contain
+ * list of its acls as a json object. Following gives an example:
+ *
+ * <pre>
+ * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+ * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+ * /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+ * </pre>
+ */
+object AclZNode {
+  def path = "/kafka-acl"
+}
+
+object ResourceTypeZNode {
+  def path(resourceType: String) = s"${AclZNode.path}/$resourceType"
+}
+
+object ResourceZNode {
+  def path(resource: Resource) = s"${AclZNode.path}/${resource.resourceType}/${resource.name}"
+  def encode(acls: Set[Acl]): Array[Byte] = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls))
+  def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion)
+}
+
+object AclChangeNotificationZNode {
+  def path = "/kafka-acl-changes"
+}
+
+object AclChangeNotificationSequenceZNode {
+  val SequenceNumberPrefix = "acl_changes_"
+  def createPath = s"${AclChangeNotificationZNode.path}/$SequenceNumberPrefix"
+  def deletePath(sequenceNode: String) = s"${AclChangeNotificationZNode.path}/${sequenceNode}"
+  def encode(resourceName : String): Array[Byte] = resourceName.getBytes(UTF_8)
+  def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 149e7eb..ef17059 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -38,13 +38,11 @@ import scala.collection.JavaConverters._
  * @param sessionTimeoutMs session timeout in milliseconds
  * @param connectionTimeoutMs connection timeout in milliseconds
  * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking.
- * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
  */
 class ZooKeeperClient(connectString: String,
                       sessionTimeoutMs: Int,
                       connectionTimeoutMs: Int,
-                      maxInFlightRequests: Int,
-                      stateChangeHandler: StateChangeHandler) extends Logging {
+                      maxInFlightRequests: Int) extends Logging {
   this.logIdent = "[ZooKeeperClient] "
   private val initializationLock = new ReentrantReadWriteLock()
   private val isConnectedOrExpiredLock = new ReentrantLock()
@@ -52,6 +50,7 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
+  private val stateChangeHandlers = new ConcurrentHashMap[String, StateChangeHandler]().asScala
 
   info(s"Initializing a new session to $connectString.")
   @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
@@ -233,10 +232,27 @@ class ZooKeeperClient(connectString: String,
     zNodeChildChangeHandlers.remove(path)
   }
 
+  /**
+   * @param stateChangeHandler
+   */
+  def registerStateChangeHandler(stateChangeHandler: StateChangeHandler): Unit = inReadLock(initializationLock) {
+    if (stateChangeHandler != null)
+      stateChangeHandlers.put(stateChangeHandler.name, stateChangeHandler)
+  }
+
+  /**
+   *
+   * @param name
+   */
+  def unregisterStateChangeHandler(name: String): Unit = inReadLock(initializationLock) {
+    stateChangeHandlers.remove(name)
+  }
+
   def close(): Unit = inWriteLock(initializationLock) {
     info("Closing.")
     zNodeChangeHandlers.clear()
     zNodeChildChangeHandlers.clear()
+    stateChangeHandlers.clear()
     zooKeeper.close()
     info("Closed.")
   }
@@ -266,10 +282,18 @@ class ZooKeeperClient(connectString: String,
         }
       }
       info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}")
-      stateChangeHandler.onReconnectionTimeout()
+      stateChangeHandlers.foreach {case (name, handler) => handler.onReconnectionTimeout()}
     }
   }
 
+  /**
+   * reinitialize method to use in unit tests
+   */
+  private[zookeeper] def reinitialize(): Unit = {
+    zooKeeper.close()
+    initialize()
+  }
+
   private object ZooKeeperClientWatcher extends Watcher {
     override def process(event: WatchedEvent): Unit = {
       debug("Received event: " + event)
@@ -279,14 +303,14 @@ class ZooKeeperClient(connectString: String,
             isConnectedOrExpiredCondition.signalAll()
           }
           if (event.getState == KeeperState.AuthFailed) {
-            info("Auth failed.")
-            stateChangeHandler.onAuthFailure()
+            error("Auth failed.")
+            stateChangeHandlers.foreach {case (name, handler) => handler.onAuthFailure()}
           } else if (event.getState == KeeperState.Expired) {
             inWriteLock(initializationLock) {
               info("Session expired.")
-              stateChangeHandler.beforeInitializingSession()
+              stateChangeHandlers.foreach {case (name, handler) => handler.beforeInitializingSession()}
               initialize()
-              stateChangeHandler.afterInitializingSession()
+              stateChangeHandlers.foreach {case (name, handler) => handler.afterInitializingSession()}
             }
           }
         case Some(path) =>
@@ -302,6 +326,7 @@ class ZooKeeperClient(connectString: String,
 }
 
 trait StateChangeHandler {
+  val name: String
   def beforeInitializingSession(): Unit = {}
   def afterInitializingSession(): Unit = {}
   def onAuthFailure(): Unit = {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 7fc2436..99550d5 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -16,14 +16,11 @@
  */
 package kafka.common
 
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, ZooKeeperTestHarness}
 import org.junit.Test
 
-class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
-
-  override def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
 
   @Test
   def testProcessNotification() {
@@ -36,18 +33,16 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
       }
     }
 
-    val seqNodeRoot = "/root"
-    val seqNodePrefix = "prefix"
-    val seqNodePath = seqNodeRoot + "/" + seqNodePrefix
+    zkClient.createAclPaths()
     val notificationMessage1 = "message1"
     val notificationMessage2 = "message2"
     val changeExpirationMs = 1000
 
-    val notificationListener = new ZkNodeChangeNotificationListener(zkUtils, seqNodeRoot, seqNodePrefix, notificationHandler, changeExpirationMs)
+    val notificationListener = new ZkNodeChangeNotificationListener(zkClient,  AclChangeNotificationZNode.path,
+      AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs)
     notificationListener.init()
 
-    zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage1)
-
+    zkClient.createAclChangeNotification(notificationMessage1)
     TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1,
       "Failed to send/process notification message in the timeout period.")
 
@@ -59,11 +54,11 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
      * can fail as the second node can be deleted depending on how threads get scheduled.
      */
 
-    zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage2)
+    zkClient.createAclChangeNotification(notificationMessage2)
     TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2,
       "Failed to send/process notification message in the timeout period.")
 
-    (3 to 10).foreach(i => zkUtils.createSequentialPersistentPath(seqNodePath, "message" + i))
+    (3 to 10).foreach(i => zkClient.createAclChangeNotification("message" + i))
 
     TestUtils.waitUntilTrue(() => invocationCount == 10 ,
       s"Expected 10 invocations of processNotifications, but there were $invocationCount")

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
index dd33dc4..dfdd85f 100644
--- a/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/AclTest.scala
@@ -16,9 +16,11 @@
  */
 package kafka.security.auth
 
+import java.nio.charset.StandardCharsets.UTF_8
+
 import kafka.utils.Json
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.{Test, Assert}
+import org.junit.{Assert, Test}
 import org.scalatest.junit.JUnitSuite
 
 class AclTest extends JUnitSuite {
@@ -36,8 +38,8 @@ class AclTest extends JUnitSuite {
     val acls = Set[Acl](acl1, acl2, acl3)
     val jsonAcls = Json.encode(Acl.toJsonCompatibleMap(acls))
 
-    Assert.assertEquals(acls, Acl.fromJson(jsonAcls))
-    Assert.assertEquals(acls, Acl.fromJson(AclJson))
+    Assert.assertEquals(acls, Acl.fromBytes(jsonAcls.getBytes(UTF_8)))
+    Assert.assertEquals(acls, Acl.fromBytes(AclJson.getBytes(UTF_8)))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 22a06e7..1e18f1d 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -237,12 +237,12 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     //test remove all acls for resource
     simpleAclAuthorizer.removeAcls(resource)
     TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
-    assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
+    assertTrue(!zkClient.resourceExists(resource))
 
     //test removing last acl also deletes ZooKeeper path
     acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
     changeAclAndVerify(acls, Set.empty[Acl], acls)
-    assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
+    assertTrue(!zkClient.resourceExists(resource))
   }
 
   @Test
@@ -258,7 +258,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     val acls1 = Set[Acl](acl2)
     simpleAclAuthorizer.addAcls(acls1, resource1)
 
-    zkUtils.deletePathRecursive(SimpleAclAuthorizer.AclChangedZkPath)
+    zkClient.deleteAclChangeNotifications
     val authorizer = new SimpleAclAuthorizer
     try {
       authorizer.configure(config.originals)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 9d2bb8b..dd7bb5f 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -187,7 +187,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     EasyMock.expectLastCall().once()
     EasyMock.replay(handler)
 
-    val configManager = new DynamicConfigManager(zkUtils, Map(ConfigType.Topic -> handler))
+    val configManager = new DynamicConfigManager(zkUtils, zkClient, Map(ConfigType.Topic -> handler))
     // Notifications created using the old TopicConfigManager are ignored.
     configManager.ConfigChangedNotificationHandler.processNotification("not json")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 775c68e..347569d 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,33 +16,19 @@
 */
 package kafka.zk
 
-import kafka.server.Defaults
-import kafka.zookeeper.ZooKeeperClient
+import java.util.UUID
+
+import kafka.security.auth._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
-import org.junit.{After, Before, Test}
+import org.junit.Test
 
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
-  private var zooKeeperClient: ZooKeeperClient = null
-  private var zkClient: KafkaZkClient = null
-
   private val group = "my-group"
   private val topicPartition = new TopicPartition("topic", 0)
 
-  @Before
-  override def setUp() {
-    super.setUp()
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Defaults.ZkMaxInFlightRequests, null)
-    zkClient = new KafkaZkClient(zooKeeperClient, false)
-  }
-
-  @After
-  override def tearDown() {
-    zkClient.close()
-    super.tearDown()
-  }
-
   @Test
   def testSetAndGetConsumerOffset() {
     val offset = 123L
@@ -179,4 +165,117 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(Map.empty, zkClient.getPartitionReassignment)
   }
 
+  @Test
+  def testGetDataAndStat() {
+    val path = "/testpath"
+
+    // test with non-existing path
+    var dataAndVersion = zkClient.getDataAndStat(path)
+    assertTrue(dataAndVersion._1.isEmpty)
+    assertEquals(0, dataAndVersion._2.getVersion)
+
+    // create a test path
+    zkClient.createRecursive(path)
+    zkClient.conditionalUpdatePath(path, "version1", 0)
+
+    // test with existing path
+    dataAndVersion = zkClient.getDataAndStat(path)
+    assertEquals("version1", dataAndVersion._1.get)
+    assertEquals(1, dataAndVersion._2.getVersion)
+
+    zkClient.conditionalUpdatePath(path, "version2", 1)
+    dataAndVersion = zkClient.getDataAndStat(path)
+    assertEquals("version2", dataAndVersion._1.get)
+    assertEquals(2, dataAndVersion._2.getVersion)
+  }
+
+  @Test
+  def testGetChildren() {
+    val path = "/testpath"
+
+    // test with non-existing path
+    assertTrue(zkClient.getChildren(path).isEmpty)
+
+    // create child nodes
+    zkClient.createRecursive( "/testpath/child1")
+    zkClient.createRecursive( "/testpath/child2")
+    zkClient.createRecursive( "/testpath/child3")
+
+    val children = zkClient.getChildren(path)
+
+    assertEquals(3, children.size)
+    assertEquals(Set("child1","child2","child3"), children.toSet)
+  }
+
+  @Test
+  def testAclManagementMethods() {
+
+    assertFalse(zkClient.pathExists(AclZNode.path))
+    assertFalse(zkClient.pathExists(AclChangeNotificationZNode.path))
+    ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(ResourceTypeZNode.path(resource.name))))
+
+    // create acl paths
+    zkClient.createAclPaths
+
+    assertTrue(zkClient.pathExists(AclZNode.path))
+    assertTrue(zkClient.pathExists(AclChangeNotificationZNode.path))
+    ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(ResourceTypeZNode.path(resource.name))))
+
+    val resource1 = new Resource(Topic, UUID.randomUUID().toString)
+    val resource2 = new Resource(Topic, UUID.randomUUID().toString)
+
+    // try getting acls for non-existing resource
+    var versionedAcls = zkClient.getVersionedAclsForResource(resource1)
+    assertTrue(versionedAcls.acls.isEmpty)
+    assertEquals(-1, versionedAcls.zkVersion)
+    assertFalse(zkClient.resourceExists(resource1))
+
+
+    val acl1 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"), Deny, "host1" , Read)
+    val acl2 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Allow, "*", Read)
+    val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Deny, "host1", Read)
+
+    //create acls for resources
+    zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl2), 0)
+    zkClient.conditionalSetOrCreateAclsForResource(resource2, Set(acl1, acl3), 0)
+
+    versionedAcls = zkClient.getVersionedAclsForResource(resource1)
+    assertEquals(Set(acl1, acl2), versionedAcls.acls)
+    assertEquals(0, versionedAcls.zkVersion)
+    assertTrue(zkClient.resourceExists(resource1))
+
+    //update acls for resource
+    zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl3), 0)
+
+    versionedAcls = zkClient.getVersionedAclsForResource(resource1)
+    assertEquals(Set(acl1, acl3), versionedAcls.acls)
+    assertEquals(1, versionedAcls.zkVersion)
+
+    //get resource Types
+    assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes().toSet)
+
+    //get resource name
+    val resourceNames = zkClient.getResourceNames(Topic.name)
+    assertEquals(2, resourceNames.size)
+    assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet)
+
+    //delete resource
+    assertTrue(zkClient.deleteResource(resource1))
+    assertFalse(zkClient.resourceExists(resource1))
+
+    //delete with invalid expected zk version
+    assertFalse(zkClient.conditionalDelete(resource2, 10))
+    //delete with valid expected zk version
+    assertTrue(zkClient.conditionalDelete(resource2, 0))
+
+
+    zkClient.createAclChangeNotification("resource1")
+    zkClient.createAclChangeNotification("resource2")
+
+    assertEquals(2, zkClient.getChildren(AclChangeNotificationZNode.path).size)
+
+    zkClient.deleteAclChangeNotifications()
+    assertTrue(zkClient.getChildren(AclChangeNotificationZNode.path).isEmpty)
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 85a5596..03741ef 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -32,15 +32,20 @@ import scala.collection.JavaConverters._
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
+import kafka.zookeeper.ZooKeeperClient
 
 @Category(Array(classOf[IntegrationTest]))
 abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
   val zkConnectionTimeout = 10000
   val zkSessionTimeout = 6000
+  val zkMaxInFlightRequests = Int.MaxValue
+
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkUtils: ZkUtils = null
+  var zooKeeperClient: ZooKeeperClient = null
+  var zkClient: KafkaZkClient = null
   var zookeeper: EmbeddedZookeeper = null
 
   def zkPort: Int = zookeeper.port
@@ -50,12 +55,17 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
     zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
+
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests)
+    zkClient = new KafkaZkClient(zooKeeperClient, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
   }
 
   @After
   def tearDown() {
     if (zkUtils != null)
      CoreUtils.swallow(zkUtils.close())
+    if (zkClient != null)
+     zkClient.close()
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown())
     Configuration.setConfiguration(null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b44c3e7/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 50a065f..ea6a475 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -42,23 +42,22 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test(expected = classOf[UnknownHostException])
   def testUnresolvableConnectString(): Unit = {
-    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, null)
+    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue)
   }
 
   @Test(expected = classOf[ZooKeeperClientTimeoutException])
   def testConnectionTimeout(): Unit = {
     zookeeper.shutdown()
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue, null)
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue)
   }
 
   @Test
   def testConnection(): Unit = {
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue)
   }
 
   @Test
   def testDeleteNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
     assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode)
   }
@@ -66,7 +65,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testDeleteExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
     val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
@@ -75,7 +73,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testExistsNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
     assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode)
   }
@@ -83,7 +80,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testExistsExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
     val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
@@ -92,7 +88,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testGetDataNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
     assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode)
   }
@@ -101,7 +96,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   def testGetDataExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
     val data = bytes
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
       CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -112,7 +106,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testSetDataNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
     assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode)
   }
@@ -121,7 +114,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   def testSetDataExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
     val data = bytes
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
       ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -134,7 +126,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testGetAclNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
     assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode)
   }
@@ -142,7 +133,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testGetAclExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
     val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
@@ -153,14 +143,12 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testSetAclNonExistentZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val setAclResponse = zooKeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
     assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode)
   }
 
   @Test
   def testGetChildrenNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
     assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode)
   }
@@ -168,7 +156,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testGetChildrenExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
       ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -184,7 +171,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     val child2 = "child2"
     val child1Path = mockPath + "/" + child1
     val child2Path = mockPath + "/" + child2
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
       ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -203,7 +189,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testPipelinedGetData(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     val createResponses = createRequests.map(zooKeeperClient.handleRequest)
     createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode))
@@ -220,7 +205,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testMixedPipeline(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
       ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -235,7 +219,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testZNodeChangeHandlerForCreation(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
       override def handleCreation(): Unit = {
@@ -256,7 +239,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testZNodeChangeHandlerForDeletion(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
       override def handleDeletion(): Unit = {
@@ -279,7 +261,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testZNodeChangeHandlerForDataChange(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
       override def handleDataChange(): Unit = {
@@ -302,7 +283,6 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testZNodeChildChangeHandlerForChildChange(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
       override def handleChildChange(): Unit = {
@@ -328,18 +308,24 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf")
     val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
     val stateChangeHandler = new StateChangeHandler {
+      override val name: String =  this.getClass.getName
+
       override def onAuthFailure(): Unit = {
         stateChangeHandlerCountDownLatch.countDown()
       }
     }
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, stateChangeHandler)
+
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue)
+    zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
+    zooKeeperClient.reinitialize()
+
     assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
   @Test
   def testConnectionLossRequestTermination(): Unit = {
     val batchSize = 10
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2)
     zookeeper.shutdown()
     val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
     val countDownLatch = new CountDownLatch(1)