You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/19 00:24:09 UTC

[4/5] kafka git commit: KAFKA-2639: Refactoring of ZkUtils

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 2027ec8..f39b9a1 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -37,6 +37,7 @@ import kafka.utils.ZkUtils._
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
 import scala.collection._
@@ -88,8 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[ConsumerFetcherManager] = None
-  private var zkClient: ZkClient = null
-  private var zkConnection : ZkConnection = null
+  private var zkUtils: ZkUtils = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
   private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
   private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
@@ -173,21 +173,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private def createFetcher() {
     if (enableFetcher)
-      fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient))
+      fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkUtils))
   }
 
   private def connectZk() {
     info("Connecting to zookeeper instance at " + config.zkConnect)
-    val (client, connection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
-    zkClient = client
-    zkConnection = connection
+    zkUtils = ZkUtils(config.zkConnect,
+                      config.zkSessionTimeoutMs,
+                      config.zkConnectionTimeoutMs,
+                      JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
   }
 
   // Blocks until the offset manager is located and a channel is established to it.
   private def ensureOffsetManagerConnected() {
     if (config.offsetsStorage == "kafka") {
       if (offsetsChannel == null || !offsetsChannel.isConnected)
-        offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient,
+        offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkUtils,
           config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs)
 
       debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port))
@@ -213,9 +214,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           sendShutdownToAllQueues()
           if (config.autoCommitEnable)
             commitOffsets(true)
-          if (zkClient != null) {
-            zkClient.close()
-            zkClient = null
+          if (zkUtils != null) {
+            zkUtils.close()
+            zkUtils = null
           }
 
           if (offsetsChannel != null) offsetsChannel.disconnect()
@@ -266,7 +267,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
                                                     consumerRegistryDir + "/" + consumerIdString, 
                                                     consumerRegistrationInfo,
-                                                    zkConnection.getZookeeper)
+                                                    zkUtils.zkConnection.getZookeeper,
+                                                    false)
     zkWatchedEphemeral.create()
 
     info("end registering consumer " + consumerIdString + " in ZK")
@@ -296,7 +298,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) {
     if (checkpointedZkOffsets.get(topicPartition) != offset) {
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
-      updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
+      zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
       checkpointedZkOffsets.put(topicPartition, offset)
       zkCommitMeter.mark()
     }
@@ -404,7 +406,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = {
     val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
-    val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
+    val offsetString = zkUtils.readDataMaybeNull(dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
     offsetString match {
       case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong))
       case None => (topicPartition, OffsetMetadataAndError.NoOffset)
@@ -599,7 +601,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private def deletePartitionOwnershipFromZK(topic: String, partition: Int) {
       val topicDirs = new ZKGroupTopicDirs(group, topic)
       val znode = topicDirs.consumerOwnerDir + "/" + partition
-      deletePath(zkClient, znode)
+      zkUtils.deletePath(znode)
       debug("Consumer " + consumerIdString + " releasing " + znode)
     }
 
@@ -630,7 +632,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             var done = false
             var cluster: Cluster = null
             try {
-              cluster = getCluster(zkClient)
+              cluster = zkUtils.getCluster()
               done = rebalance(cluster)
             } catch {
               case e: Throwable =>
@@ -660,14 +662,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     private def rebalance(cluster: Cluster): Boolean = {
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(
-        group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
-      val brokers = getAllBrokersInCluster(zkClient)
+        group, consumerIdString, zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
+      val brokers = zkUtils.getAllBrokersInCluster()
       if (brokers.size == 0) {
         // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
         // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
         // are up.
         warn("no brokers found when trying to rebalance.")
-        zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
+        zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
         true
       }
       else {
@@ -690,7 +692,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           )
         }
         releasePartitionOwnership(topicRegistry)
-        val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
+        val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkUtils)
         val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
         val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId)
         val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
@@ -713,7 +715,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           })
 
           /**
-           * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
+           * move the partition ownership here, since that can be used to indicate a truly successful re-balancing attempt
            * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
            */
           if(reflectPartitionOwnershipDecision(partitionAssignment)) {
@@ -832,9 +834,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         val topic = partitionOwner._1.topic
         val partition = partitionOwner._1.partition
         val consumerThreadId = partitionOwner._2
-        val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition)
+        val partitionOwnerPath = zkUtils.getConsumerPartitionOwnerPath(group, topic, partition)
         try {
-          createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId.toString)
+          zkUtils.createEphemeralPathExpectConflict(partitionOwnerPath, consumerThreadId.toString)
           info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
           successfullyOwnedPartitions ::= (topic, partition)
           true
@@ -951,14 +953,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     })
 
     // listener to consumer and partition changes
-    zkClient.subscribeStateChanges(sessionExpirationListener)
+    zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
 
-    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+    zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
 
     topicStreamsMap.foreach { topicAndStreams =>
       // register on broker partition path changes
       val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
-      zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
+      zkUtils.zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
     }
 
     // explicitly trigger load balancing for this consumer
@@ -988,11 +990,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
      // bootstrap with existing topics
     private var wildcardTopics =
-      getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+      zkUtils.getChildrenParentMayNotExist(BrokerTopicsPath)
         .filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics))
 
     private val wildcardTopicCount = TopicCount.constructTopicCount(
-      consumerIdString, topicFilter, numStreams, zkClient, config.excludeInternalTopics)
+      consumerIdString, topicFilter, numStreams, zkUtils, config.excludeInternalTopics)
 
     val dirs = new ZKGroupDirs(config.groupId)
     registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
@@ -1002,7 +1004,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
      * Topic events will trigger subsequent synced rebalances.
      */
     info("Creating topic event watcher for topics " + topicFilter)
-    wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkClient, this)
+    wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkUtils, this)
 
     def handleTopicEvent(allTopics: Seq[String]) {
       debug("Handling topic event")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index f74823b..0cd22f0 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -22,7 +22,7 @@ import kafka.utils.{ZkUtils, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
-class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
+class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
     val eventHandler: TopicEventHandler[String]) extends Logging {
 
   val lock = new Object()
@@ -31,24 +31,24 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
 
   private def startWatchingTopicEvents() {
     val topicEventListener = new ZkTopicEventListener()
-    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
+    zkUtils.makeSurePersistentPathExists(ZkUtils.BrokerTopicsPath)
 
-    zkClient.subscribeStateChanges(
+    zkUtils.zkClient.subscribeStateChanges(
       new ZkSessionExpireListener(topicEventListener))
 
-    val topics = zkClient.subscribeChildChanges(
+    val topics = zkUtils.zkClient.subscribeChildChanges(
       ZkUtils.BrokerTopicsPath, topicEventListener).toList
 
     // call to bootstrap topic list
     topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics)
   }
 
-  private def stopWatchingTopicEvents() { zkClient.unsubscribeAll() }
+  private def stopWatchingTopicEvents() { zkUtils.zkClient.unsubscribeAll() }
 
   def shutdown() {
     lock.synchronized {
       info("Shutting down topic event watcher.")
-      if (zkClient != null) {
+      if (zkUtils != null) {
         stopWatchingTopicEvents()
       }
       else {
@@ -63,8 +63,8 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
     def handleChildChange(parent: String, children: java.util.List[String]) {
       lock.synchronized {
         try {
-          if (zkClient != null) {
-            val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
+          if (zkUtils != null) {
+            val latestTopics = zkUtils.zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
             debug("all topics: %s".format(latestTopics))
             eventHandler.handleTopicEvent(latestTopics)
           }
@@ -87,9 +87,9 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
     @throws(classOf[Exception])
     def handleNewSession() {
       lock.synchronized {
-        if (zkClient != null) {
+        if (zkUtils != null) {
           info("ZK expired: resubscribing topic event listener to topic registry")
-          zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener)
+          zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a7b44ca..0a1a684 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -43,8 +43,7 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.server._
 import kafka.common.TopicAndPartition
 
-class ControllerContext(val zkClient: ZkClient,
-                        val zkConnection: ZkConnection,
+class ControllerContext(val zkUtils: ZkUtils,
                         val zkSessionTimeout: Int) {
   var controllerChannelManager: ControllerChannelManager = null
   val controllerLock: ReentrantLock = new ReentrantLock()
@@ -154,11 +153,11 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection: ZkConnection, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
-  val controllerContext = new ControllerContext(zkClient, zkConnection, config.zkSessionTimeoutMs)
+  val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
@@ -321,7 +320,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
       //read controller epoch from zk
       readControllerEpochFromZookeeper()
       // increment the controller epoch
-      incrementControllerEpoch(zkClient)
+      incrementControllerEpoch(zkUtils.zkClient)
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
       registerReassignedPartitionsListener()
       registerIsrChangeNotificationListener()
@@ -613,7 +612,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
       reassignedReplicas.toSet)
     reassignedPartitionContext.isrChangeListener = isrChangeListener
     // register listener on the leader and isr path to wait until they catch up with the current leader
-    zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
+    zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
   }
 
   def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@@ -703,7 +702,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
   def incrementControllerEpoch(zkClient: ZkClient) = {
     try {
       var newControllerEpoch = controllerContext.epoch + 1
-      val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPathIfExists(zkClient,
+      val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists(
         ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
       if(!updateSucceeded)
         throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
@@ -732,14 +731,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
   }
 
   private def registerSessionExpirationListener() = {
-    zkClient.subscribeStateChanges(new SessionExpirationListener())
+    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
   }
 
   private def initializeControllerContext() {
     // update controller cache with delete topic information
-    controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
-    controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
-    controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
+    controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet
+    controllerContext.allTopics = zkUtils.getAllTopics().toSet
+    controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
     controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
     // update the leader and isr cache for all existing partitions from Zookeeper
@@ -756,7 +755,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
 
   private def initializePreferredReplicaElection() {
     // initialize preferred replica election state
-    val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
+    val partitionsUndergoingPreferredReplicaElection = zkUtils.getPartitionsUndergoingPreferredReplicaElection()
     // check if they are already completed or topic was deleted
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
       val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
@@ -774,7 +773,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
 
   private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
-    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+    val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
     // check if they are already completed or topic was deleted
     val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
       val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
@@ -793,7 +792,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
   }
 
   private def initializeTopicDeletion() {
-    val topicsQueuedForDeletion = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.DeleteTopicsPath).toSet
+    val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
     val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
       replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
     val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
@@ -822,13 +821,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
   }
 
   def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
-    val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions)
+    val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions)
     for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
       controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
   }
 
   private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
-    getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+    zkUtils.getLeaderAndIsrForPartition(topic, partition) match {
       case Some(leaderAndIsr) =>
         val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r))
         replicasNotInIsr.isEmpty
@@ -930,42 +929,42 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
 
   private def registerIsrChangeNotificationListener() = {
     debug("Registering IsrChangeNotificationListener")
-    zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
+    zkUtils.zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
   }
 
   private def deregisterIsrChangeNotificationListener() = {
     debug("De-registering IsrChangeNotificationListener")
-    zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
+    zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
   }
 
   private def registerReassignedPartitionsListener() = {
-    zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
+    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
   }
 
   private def deregisterReassignedPartitionsListener() = {
-    zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
+    zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
   }
 
   private def registerPreferredReplicaElectionListener() {
-    zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+    zkUtils.zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
   }
 
   private def deregisterPreferredReplicaElectionListener() {
-    zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+    zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
   }
 
   private def deregisterReassignedPartitionsIsrChangeListeners() {
     controllerContext.partitionsBeingReassigned.foreach {
       case (topicAndPartition, reassignedPartitionsContext) =>
-        val zkPartitionPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
-        zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
+        val zkPartitionPath = getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
+        zkUtils.zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
     }
   }
 
   private def readControllerEpochFromZookeeper() {
     // initialize the controller epoch and zk version by reading from zookeeper
-    if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) {
-      val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath)
+    if(controllerContext.zkUtils.pathExists(ZkUtils.ControllerEpochPath)) {
+      val epochData = controllerContext.zkUtils.readData(ZkUtils.ControllerEpochPath)
       controllerContext.epoch = epochData._1.toInt
       controllerContext.epochZkVersion = epochData._2.getVersion
       info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
@@ -975,15 +974,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
   def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
     if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
       // stop watching the ISR changes for this partition
-      zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+      zkUtils.zkClient.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
         controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
     }
     // read the current list of reassigned partitions from zookeeper
-    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+    val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
     // remove this partition from that list
     val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
     // write the new list to zookeeper
-    ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
+    zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
     // update the cache. NO-OP if the partition's reassignment was never started
     controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
   }
@@ -991,9 +990,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
   def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
                                          newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
     try {
-      val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic)
-      val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
-      ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
+      val zkPath = getTopicPath(topicAndPartition.topic)
+      val jsonPartitionMap = zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
+      zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
       case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
@@ -1014,7 +1013,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
       }
     }
     if (!isTriggeredByAutoRebalance)
-      ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+      zkUtils.deletePath(ZkUtils.PreferredReplicaLeaderElectionPath)
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
 
@@ -1057,7 +1056,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
       zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
         case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
           val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
@@ -1074,7 +1073,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
             // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
             // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
             // eventually be restored as the leader.
-            if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkClient,
+            if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkUtils,
               ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
               info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
               newIsr = leaderAndIsr.isr
@@ -1083,7 +1082,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
             val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1,
               newIsr, leaderAndIsr.zkVersion + 1)
             // update the new leadership decision in zookeeper or retry
-            val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition,
+            val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
               newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
 
             newLeaderAndIsr.zkVersion = newVersion
@@ -1120,7 +1119,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
       zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
         case Some(leaderIsrAndEpoch) =>
           val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
@@ -1134,7 +1133,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
           val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
                                                  leaderAndIsr.isr, leaderAndIsr.zkVersion + 1)
           // update the new leadership decision in zookeeper or retry
-          val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic,
+          val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic,
             partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
 
           newLeaderAndIsr.zkVersion = newVersion
@@ -1245,7 +1244,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
  */
 class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
   this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
-  val zkClient = controller.controllerContext.zkClient
+  val zkUtils = controller.controllerContext.zkUtils
   val controllerContext = controller.controllerContext
 
   /**
@@ -1256,7 +1255,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
   def handleDataChange(dataPath: String, data: Object) {
     debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
       .format(dataPath, data))
-    val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
+    val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString)
     val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
       partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
     }
@@ -1288,7 +1287,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
                                             reassignedReplicas: Set[Int])
   extends IZkDataListener with Logging {
   this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: "
-  val zkClient = controller.controllerContext.zkClient
+  val zkUtils = controller.controllerContext.zkUtils
   val controllerContext = controller.controllerContext
 
   /**
@@ -1305,7 +1304,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
         controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
           case Some(reassignedPartitionContext) =>
             // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
-            val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+            val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
             newLeaderAndIsrOpt match {
               case Some(leaderAndIsr) => // check if new replicas have joined ISR
                 val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
@@ -1359,7 +1358,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
         processUpdateNotifications(topicAndPartitions)
       } finally {
         // delete processed children
-        childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient,
+        childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(
           ZkUtils.IsrChangeNotificationPath + "/" + x))
       }
     }
@@ -1373,7 +1372,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
 
   private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
     val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
-    val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode)
+    val (jsonOpt, stat) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
     if (jsonOpt.isDefined) {
       val json = Json.parseFull(jsonOpt.get)
 
@@ -1410,7 +1409,7 @@ object IsrChangeNotificationListener {
  */
 class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
   this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: "
-  val zkClient = controller.controllerContext.zkClient
+  val zkUtils = controller.controllerContext.zkUtils
   val controllerContext = controller.controllerContext
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 4ebeb5a..5eed382 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
           case true =>
             // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
             // for unclean leader election.
-            if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkClient,
+            if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
               ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
               throw new NoReplicaOnlineException(("No broker in ISR for partition " +
                 "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 675a807..73b173e 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -22,7 +22,8 @@ import collection.mutable.Buffer
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
 import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
-import kafka.utils.{Logging, ZkUtils, ReplicationUtils}
+import kafka.utils.{Logging, ReplicationUtils}
+import kafka.utils.ZkUtils._
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.controller.Callbacks.CallbackBuilder
@@ -43,7 +44,7 @@ import kafka.utils.CoreUtils._
 class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val controllerContext = controller.controllerContext
   private val controllerId = controller.config.brokerId
-  private val zkClient = controllerContext.zkClient
+  private val zkUtils = controllerContext.zkUtils
   private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
   private val hasStarted = new AtomicBoolean(false)
@@ -83,7 +84,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     deregisterTopicChangeListener()
     addPartitionsListener.foreach {
       case (topic, listener) =>
-        zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener)
+        zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
     }
     addPartitionsListener.clear()
     if(controller.config.deleteTopicEnable)
@@ -289,9 +290,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           controller.epoch)
         debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
         try {
-          ZkUtils.createPersistentPath(controllerContext.zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
-            ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
+          zkUtils.createPersistentPath(
+            getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+            zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
           // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
@@ -301,7 +302,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         } catch {
           case e: ZkNodeExistsException =>
             // read the controller epoch
-            val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
+            val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
               topicAndPartition.partition).get
             val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
                            "exists with value %s and controller epoch %d")
@@ -342,7 +343,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         }
         // elect new leader or throw exception
         val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
-        val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition,
+        val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
           leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
         newLeaderAndIsr = leaderAndIsr
         newLeaderAndIsr.zkVersion = newVersion
@@ -370,34 +371,34 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   private def registerTopicChangeListener() = {
-    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
+    zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
   }
 
   private def deregisterTopicChangeListener() = {
-    zkClient.unsubscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
+    zkUtils.zkClient.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener)
   }
 
   def registerPartitionChangeListener(topic: String) = {
     addPartitionsListener.put(topic, new AddPartitionsListener(topic))
-    zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
+    zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic))
   }
 
   def deregisterPartitionChangeListener(topic: String) = {
-    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
+    zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic))
     addPartitionsListener.remove(topic)
   }
 
   private def registerDeleteTopicListener() = {
-    zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
+    zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
   }
 
   private def deregisterDeleteTopicListener() = {
-    zkClient.unsubscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
+    zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
   }
 
   private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
     val topicAndPartition = TopicAndPartition(topic, partition)
-    ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
+    ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) match {
       case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
       case None =>
         val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state"
@@ -426,7 +427,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             val deletedTopics = controllerContext.allTopics -- currentChildren
             controllerContext.allTopics = currentChildren
 
-            val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
+            val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
             controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
               !deletedTopics.contains(p._1.topic))
             controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
@@ -449,7 +450,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    */
   class DeleteTopicsListener() extends IZkChildListener with Logging {
     this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
-    val zkClient = controllerContext.zkClient
+    val zkUtils = controllerContext.zkUtils
 
     /**
      * Invoked when a topic is being deleted
@@ -466,7 +467,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
         if(nonExistentTopics.size > 0) {
           warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
-          nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic)))
+          nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
         }
         topicsToBeDeleted --= nonExistentTopics
         if(topicsToBeDeleted.size > 0) {
@@ -505,7 +506,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       inLock(controllerContext.controllerLock) {
         try {
           info("Add Partition triggered " + data.toString + " for path " + dataPath)
-          val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+          val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
           val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
             !controllerContext.partitionReplicaAssignment.contains(p._1))
           if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index acad83a..32ed288 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -47,7 +47,7 @@ import kafka.utils.CoreUtils._
 class ReplicaStateMachine(controller: KafkaController) extends Logging {
   private val controllerContext = controller.controllerContext
   private val controllerId = controller.config.brokerId
-  private val zkClient = controllerContext.zkClient
+  private val zkUtils = controllerContext.zkUtils
   private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
   private val brokerChangeListener = new BrokerChangeListener()
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
@@ -171,7 +171,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
         case NewReplica =>
           assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
           // start replica as a follower to the current leader for its partition
-          val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+          val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
           leaderIsrAndControllerEpochOpt match {
             case Some(leaderIsrAndControllerEpoch) =>
               if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
@@ -313,11 +313,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   }
 
   private def registerBrokerChangeListener() = {
-    zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+    zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
   }
 
   private def deregisterBrokerChangeListener() = {
-    zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+    zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
   }
 
   /**
@@ -359,10 +359,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
               val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
-              val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _))
+              val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
               val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
               val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
-              controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+              controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
               info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
                 .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
               newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 9e39dd5..c6f80ac 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -23,8 +23,9 @@ import org.apache.kafka.common.requests.{StopReplicaResponse, AbstractRequestRes
 
 import collection.mutable
 import collection.JavaConverters._
-import kafka.utils.{ShutdownableThread, Logging, ZkUtils}
+import kafka.utils.{ShutdownableThread, Logging}
 import kafka.utils.CoreUtils._
+import kafka.utils.ZkUtils._
 import collection.Set
 import kafka.common.TopicAndPartition
 import java.util.concurrent.locks.ReentrantLock
@@ -288,9 +289,10 @@ class TopicDeletionManager(controller: KafkaController,
     partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
     topicsToBeDeleted -= topic
     partitionsToBeDeleted.retain(_.topic != topic)
-    controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
-    controllerContext.zkClient.deleteRecursive(ZkUtils.getEntityConfigPath(ConfigType.Topic, topic))
-    controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
+    val zkUtils = controllerContext.zkUtils
+    zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
+    zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
+    zkUtils.zkClient.delete(getDeleteTopicPath(topic))
     controllerContext.removeTopic(topic)
   }
 
@@ -385,7 +387,7 @@ class TopicDeletionManager(controller: KafkaController,
   }
 
   class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
-    val zkClient = controllerContext.zkClient
+    val zkUtils = controllerContext.zkUtils
     override def doWork() {
       awaitTopicDeletionNotification()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 68ff4fc..bf23e9b 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -43,7 +43,7 @@ class ConsumerCoordinator(val brokerId: Int,
                           val groupConfig: GroupManagerConfig,
                           val offsetConfig: OffsetManagerConfig,
                           private val offsetManager: OffsetManager,
-                          zkClient: ZkClient) extends Logging {
+                          zkUtils: ZkUtils) extends Logging {
 
   this.logIdent = "[ConsumerCoordinator " + brokerId + "]: "
 
@@ -57,9 +57,9 @@ class ConsumerCoordinator(val brokerId: Int,
            groupConfig: GroupManagerConfig,
            offsetConfig: OffsetManagerConfig,
            replicaManager: ReplicaManager,
-           zkClient: ZkClient,
+           zkUtils: ZkUtils,
            scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig,
-    new OffsetManager(offsetConfig, replicaManager, zkClient, scheduler), zkClient)
+    new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler), zkUtils)
 
   def offsetsTopicConfigs: Properties = {
     val props = new Properties
@@ -81,7 +81,7 @@ class ConsumerCoordinator(val brokerId: Int,
     info("Starting up.")
     heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
     rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId)
-    coordinatorMetadata = new CoordinatorMetadata(brokerId, zkClient, maybePrepareRebalance)
+    coordinatorMetadata = new CoordinatorMetadata(brokerId, zkUtils, maybePrepareRebalance)
     isActive.set(true)
     info("Startup complete.")
   }
@@ -499,7 +499,7 @@ object ConsumerCoordinator {
   val OffsetsTopicName = "__consumer_offsets"
 
   def create(config: KafkaConfig,
-             zkClient: ZkClient,
+             zkUtils: ZkUtils,
              replicaManager: ReplicaManager,
              kafkaScheduler: KafkaScheduler): ConsumerCoordinator = {
     val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
@@ -513,11 +513,11 @@ object ConsumerCoordinator {
     val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
       consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
 
-    new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkClient, kafkaScheduler)
+    new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler)
   }
 
   def create(config: KafkaConfig,
-             zkClient: ZkClient,
+             zkUtils: ZkUtils,
              offsetManager: OffsetManager): ConsumerCoordinator = {
     val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
@@ -530,6 +530,6 @@ object ConsumerCoordinator {
     val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
       consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
 
-    new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkClient)
+    new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkUtils)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
index 2920320..a33231a 100644
--- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
@@ -20,7 +20,7 @@ package kafka.coordinator
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{threadsafe, ZkUtils, Logging}
-
+import kafka.utils.ZkUtils._
 import org.I0Itec.zkclient.{ZkClient, IZkDataListener}
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -33,7 +33,7 @@ import scala.collection.mutable
  */
 @threadsafe
 private[coordinator] class CoordinatorMetadata(brokerId: Int,
-                                               zkClient: ZkClient,
+                                               zkUtils: ZkUtils,
                                                maybePrepareRebalance: ConsumerGroupMetadata => Unit) {
 
   /**
@@ -159,19 +159,19 @@ private[coordinator] class CoordinatorMetadata(brokerId: Int,
   }
 
   private def getTopicPartitionCountFromZK(topic: String) = {
-    val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic))
+    val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
     topicData(topic).size
   }
 
   private def registerTopicPartitionChangeListener(topic: String) {
     val listener = new TopicPartitionChangeListener
     topicPartitionChangeListeners.put(topic, listener)
-    zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), listener)
+    zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), listener)
   }
 
   private def deregisterTopicPartitionChangeListener(topic: String) {
     val listener = topicPartitionChangeListeners(topic)
-    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener)
+    zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
     topicPartitionChangeListeners.remove(topic)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 2e5ee8d..9b4314e 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -27,6 +27,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import scala.collection.JavaConverters._
 import org.apache.log4j.Logger
@@ -59,7 +60,7 @@ object SimpleAclAuthorizer {
   //notification node which gets updated with the resource name when acl on a resource is changed.
   val AclChangedZkPath = "/kafka-acl-changes"
 
-  //prefix of all the change notificiation sequence node.
+  //prefix of all the change notification sequence node.
   val AclChangedPrefix = "acl_changes_"
 }
 
@@ -67,7 +68,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger")
   private var superUsers = Set.empty[KafkaPrincipal]
   private var shouldAllowEveryoneIfNoAclIsFound = false
-  private var zkClient: ZkClient = null
+  private var zkUtils: ZkUtils = null
   private var aclChangeListener: ZkNodeChangeNotificationListener = null
 
   private val aclCache = new scala.collection.mutable.HashMap[Resource, Set[Acl]]
@@ -92,16 +93,19 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     val zkConnectionTimeoutMs = configs.getOrElse(SimpleAclAuthorizer.ZkConnectionTimeOutProp, kafkaConfig.zkConnectionTimeoutMs).toString.toInt
     val zkSessionTimeOutMs = configs.getOrElse(SimpleAclAuthorizer.ZkSessionTimeOutProp, kafkaConfig.zkSessionTimeoutMs).toString.toInt
 
-    zkClient = ZkUtils.createZkClient(zkUrl, zkConnectionTimeoutMs, zkSessionTimeOutMs)
-    ZkUtils.makeSurePersistentPathExists(zkClient, SimpleAclAuthorizer.AclZkPath)
+    zkUtils = ZkUtils(zkUrl,
+                      zkConnectionTimeoutMs,
+                      zkSessionTimeOutMs,
+                      JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
 
     loadCache()
 
-    ZkUtils.makeSurePersistentPathExists(zkClient, SimpleAclAuthorizer.AclChangedZkPath)
-    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler)
+    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler)
     aclChangeListener.init()
 
-    zkClient.subscribeStateChanges(ZkStateChangeListener)
+    zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
   }
 
   override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
@@ -162,17 +166,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       val updatedAcls = getAcls(resource) ++ acls
       val path = toResourcePath(resource)
 
-      if (ZkUtils.pathExists(zkClient, path))
-        ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
+      if (zkUtils.pathExists(path))
+        zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
       else
-        ZkUtils.createPersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
+        zkUtils.createPersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
 
       updateAclChangedFlag(resource)
     }
   }
 
   override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = {
-    if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) {
+    if (zkUtils.pathExists(toResourcePath(resource))) {
       val existingAcls = getAcls(resource)
       val filteredAcls = existingAcls.filter((acl: Acl) => !aclsTobeRemoved.contains(acl))
 
@@ -180,9 +184,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
       if (aclNeedsRemoval) {
         val path: String = toResourcePath(resource)
         if (filteredAcls.nonEmpty)
-          ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls)))
+          zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls)))
         else
-          ZkUtils.deletePath(zkClient, toResourcePath(resource))
+          zkUtils.deletePath(toResourcePath(resource))
 
         updateAclChangedFlag(resource)
       }
@@ -192,8 +196,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   override def removeAcls(resource: Resource): Boolean = {
-    if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) {
-      ZkUtils.deletePath(zkClient, toResourcePath(resource))
+    if (zkUtils.pathExists(toResourcePath(resource))) {
+      zkUtils.deletePath(toResourcePath(resource))
       updateAclChangedFlag(resource)
       true
     } else false
@@ -206,7 +210,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   private def getAclsFromZk(resource: Resource): Set[Acl] = {
-    val aclJson = ZkUtils.readDataMaybeNull(zkClient, toResourcePath(resource))._1
+    val aclJson = zkUtils.readDataMaybeNull(toResourcePath(resource))._1
     aclJson.map(Acl.fromJson).getOrElse(Set.empty)
   }
 
@@ -224,11 +228,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   private def loadCache()  {
     var acls = Set.empty[Acl]
-    val resourceTypes = ZkUtils.getChildren(zkClient, SimpleAclAuthorizer.AclZkPath)
+    val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
     for (rType <- resourceTypes) {
       val resourceType = ResourceType.fromString(rType)
       val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
-      val resourceNames = ZkUtils.getChildren(zkClient, resourceTypePath)
+      val resourceNames = zkUtils.getChildren(resourceTypePath)
       for (resourceName <- resourceNames) {
         acls = getAclsFromZk(Resource(resourceType, resourceName.toString))
         updateCache(new Resource(resourceType, resourceName), acls)
@@ -255,7 +259,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   private def updateAclChangedFlag(resource: Resource) {
-    ZkUtils.createSequentialPersistentPath(zkClient, SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
+    zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
   }
 
   object AclChangedNotificaitonHandler extends NotificationHandler {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 4da1833..d443a1f 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -70,7 +70,7 @@ object ConfigType {
  * on startup where a change might be missed between the initial config load and registering for change notifications.
  *
  */
-class DynamicConfigManager(private val zkClient: ZkClient,
+class DynamicConfigManager(private val zkUtils: ZkUtils,
                            private val configHandler : Map[String, ConfigHandler],
                            private val changeExpirationMs: Long = 15*60*1000,
                            private val time: Time = SystemTime) extends Logging {
@@ -80,8 +80,8 @@ class DynamicConfigManager(private val zkClient: ZkClient,
    * Begin watching for config changes
    */
   def startup() {
-    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.EntityConfigChangesPath)
-    zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
+    zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
+    zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
     processAllConfigChanges()
   }
 
@@ -89,7 +89,7 @@ class DynamicConfigManager(private val zkClient: ZkClient,
    * Process all config changes
    */
   private def processAllConfigChanges() {
-    val configChanges = zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
+    val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
     import JavaConversions._
     processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
   }
@@ -107,7 +107,7 @@ class DynamicConfigManager(private val zkClient: ZkClient,
         if (changeId > lastExecutedChange) {
           val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
 
-          val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
+          val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
           processNotification(jsonOpt)
         }
         lastExecutedChange = changeId
@@ -138,7 +138,7 @@ class DynamicConfigManager(private val zkClient: ZkClient,
             case Some(value: String) => value
             case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json)
           }
-          configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkClient, entityType, entity))
+          configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkUtils, entityType, entity))
 
         case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
                                                              "{\"version\" : 1," +
@@ -151,12 +151,12 @@ class DynamicConfigManager(private val zkClient: ZkClient,
 
   private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
     for(notification <- notifications.sorted) {
-      val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.EntityConfigChangesPath + "/" + notification)
+      val (jsonOpt, stat) = zkUtils.readDataMaybeNull(ZkUtils.EntityConfigChangesPath + "/" + notification)
       if(jsonOpt.isDefined) {
         val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
         if (now - stat.getCtime > changeExpirationMs) {
           debug("Purging config change notification " + notification)
-          ZkUtils.deletePath(zkClient, changeZnode)
+          zkUtils.deletePath(changeZnode)
         } else {
           return
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5715626..6acab8d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -44,7 +44,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
                 val coordinator: ConsumerCoordinator,
                 val controller: KafkaController,
-                val zkClient: ZkClient,
+                val zkUtils: ZkUtils,
                 val brokerId: Int,
                 val config: KafkaConfig,
                 val metadataCache: MetadataCache,
@@ -221,7 +221,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             } else if (metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
               (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
             } else {
-              ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
+              zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" +
                 topicAndPartition.partition, metaAndError.offset.toString)
               (topicAndPartition, ErrorMapping.NoError)
             }
@@ -535,14 +535,14 @@ class KafkaApis(val requestChannel: RequestChannel,
                   Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
                 else
                   config.offsetsTopicReplicationFactor.toInt
-              AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
+              AdminUtils.createTopic(zkUtils, topic, config.offsetsTopicPartitions,
                                      offsetsTopicReplicationFactor,
                                      coordinator.offsetsTopicConfigs)
               info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                 .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor))
             }
             else {
-              AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+              AdminUtils.createTopic(zkUtils, topic, config.numPartitions, config.defaultReplicationFactor)
               info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                    .format(topic, config.numPartitions, config.defaultReplicationFactor))
             }
@@ -624,7 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) {
             (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
           } else {
-            val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1
+            val payloadOpt = zkUtils.readDataMaybeNull(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1
             payloadOpt match {
               case Some(payload) => (topicAndPartition, OffsetMetadataAndError(payload.toLong))
               case None => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 16760d4..928ff43 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -35,14 +35,13 @@ import java.net.InetAddress
  */
 class KafkaHealthcheck(private val brokerId: Int,
                        private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
-                       private val zkClient: ZkClient,
-                       private val zkConnection: ZkConnection) extends Logging {
+                       private val zkUtils: ZkUtils) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
   val sessionExpireListener = new SessionExpireListener
 
   def startup() {
-    zkClient.subscribeStateChanges(sessionExpireListener)
+    zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
     register()
   }
 
@@ -62,7 +61,7 @@ class KafkaHealthcheck(private val brokerId: Int,
     // only PLAINTEXT is supported as default
     // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
     val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
-    ZkUtils.registerBrokerInZk(zkClient, zkConnection, brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
+    zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 99a3c12..f50c266 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkRece
 import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 import org.apache.kafka.common.requests.{ControlledShutdownResponse, ControlledShutdownRequest, RequestSend}
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.utils.AppInfoParser
 
@@ -128,8 +129,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   var kafkaHealthcheck: KafkaHealthcheck = null
   val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
 
-  var zkClient: ZkClient = null
-  var zkConnection: ZkConnection = null
+  var zkUtils: ZkUtils = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
   val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
@@ -165,12 +165,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         kafkaScheduler.startup()
 
         /* setup zookeeper */
-        val (client, connection) = initZk()
-        zkClient = client
-        zkConnection = connection
+        zkUtils = initZk()
 
         /* start log manager */
-        logManager = createLogManager(zkClient, brokerState)
+        logManager = createLogManager(zkUtils.zkClient, brokerState)
         logManager.startup()
 
         /* generate brokerId */
@@ -181,16 +179,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         socketServer.startup()
 
         /* start replica manager */
-        replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkClient, kafkaScheduler, logManager,
+        replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
           isShuttingDown)
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkClient, zkConnection, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
+        kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
         kafkaController.startup()
 
         /* start kafka coordinator */
-        consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler)
+        consumerCoordinator = ConsumerCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler)
         consumerCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/
@@ -204,7 +202,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
-          kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer)
+          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
         brokerState.newState(RunningAsBroker)
 
@@ -213,7 +211,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         /* start dynamic config manager */
         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
                                                            ConfigType.Client -> new ClientIdConfigHandler)
-        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
+        dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
         /* tell everyone we are alive */
@@ -223,7 +221,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           else
             (protocol, endpoint)
         }
-        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkClient, zkConnection)
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
         kafkaHealthcheck.startup()
 
         /* register broker metrics */
@@ -245,7 +243,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
     }
   }
 
-  private def initZk(): (ZkClient, ZkConnection) = {
+  private def initZk(): ZkUtils = {
     info("Connecting to zookeeper on " + config.zkConnect)
 
     val chroot = {
@@ -257,15 +255,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
     if (chroot.length > 1) {
       val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
-      val zkClientForChrootCreation = ZkUtils.createZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
-      ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
+      val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation, 
+                                              config.zkSessionTimeoutMs,
+                                              config.zkConnectionTimeoutMs,
+                                              JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+      zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
       info("Created zookeeper path " + chroot)
-      zkClientForChrootCreation.close()
+      zkClientForChrootCreation.zkClient.close()
     }
 
-    val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
-    ZkUtils.setupCommonPaths(zkClient)
-    (zkClient, zkConnection)
+    val zkUtils = ZkUtils(config.zkConnect,
+                          config.zkSessionTimeoutMs,
+                          config.zkConnectionTimeoutMs,
+                          JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+    zkUtils.setupCommonPaths()
+    zkUtils
   }
 
 
@@ -334,8 +338,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
           // Get the current controller info. This is to ensure we use the most recent info to issue the
           // controlled shutdown request
-          val controllerId = ZkUtils.getController(zkClient)
-          ZkUtils.getBrokerInfo(zkClient, controllerId) match {
+          val controllerId = zkUtils.getController()
+          zkUtils.getBrokerInfo(controllerId) match {
             case Some(broker) =>
               // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
               // attempt, connect to the most recent controller
@@ -410,8 +414,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
           // Get the current controller info. This is to ensure we use the most recent info to issue the
           // controlled shutdown request
-          val controllerId = ZkUtils.getController(zkClient)
-          ZkUtils.getBrokerInfo(zkClient, controllerId) match {
+          val controllerId = zkUtils.getController()
+          zkUtils.getBrokerInfo(controllerId) match {
             case Some(broker) =>
               if (channel == null || prevController == null || !prevController.equals(broker)) {
                 // if this is the first attempt or if the controller has changed, create a channel to the most recent
@@ -524,8 +528,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           CoreUtils.swallow(consumerCoordinator.shutdown())
         if(kafkaController != null)
           CoreUtils.swallow(kafkaController.shutdown())
-        if(zkClient != null)
-          CoreUtils.swallow(zkClient.close())
+        if(zkUtils != null)
+          CoreUtils.swallow(zkUtils.close())
         if (metrics != null)
           CoreUtils.swallow(metrics.close())
 
@@ -559,7 +563,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
-    val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
+    val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).mapValues(LogConfig.fromProps(defaultProps, _))
     // read the log configurations from zookeeper
     val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
                                       dedupeBufferSize = config.logCleanerDedupeBufferSize,
@@ -626,7 +630,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
   private def generateBrokerId: Int = {
     try {
-      ZkUtils.getBrokerSequenceId(zkClient, config.maxReservedBrokerId)
+      zkUtils.getBrokerSequenceId(config.maxReservedBrokerId)
     } catch {
       case e: Exception =>
         error("Failed to generate broker.id due to ", e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 0e613e7..bdc3bb6 100755
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -87,7 +87,7 @@ object OffsetManagerConfig {
 
 class OffsetManager(val config: OffsetManagerConfig,
                     replicaManager: ReplicaManager,
-                    zkClient: ZkClient,
+                    zkUtils: ZkUtils,
                     scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
 
   /* offsets and metadata cache */
@@ -449,7 +449,7 @@ class OffsetManager(val config: OffsetManagerConfig,
    */
   private def getOffsetsTopicPartitionCount = {
     val topic = ConsumerCoordinator.OffsetsTopicName
-    val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic))
+    val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
     if (topicData(topic).nonEmpty)
       topicData(topic).size
     else

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 82a6001..0a17fd0 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -144,7 +144,7 @@ class ReplicaFetcherThread(name: String,
       // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
       // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
       // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
-      if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkClient,
+      if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
         ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
         // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
         fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c0fec67..1fc47f4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -99,7 +99,7 @@ class ReplicaManager(val config: KafkaConfig,
                      metrics: Metrics,
                      time: Time,
                      jTime: JTime,
-                     val zkClient: ZkClient,
+                     val zkUtils: ZkUtils,
                      scheduler: Scheduler,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean,
@@ -163,7 +163,7 @@ class ReplicaManager(val config: KafkaConfig,
   def maybePropagateIsrChanges() {
     isrChangeSet synchronized {
       if (isrChangeSet.nonEmpty) {
-        ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet)
+        ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
         isrChangeSet.clear()
       }
     }