You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/19 00:24:08 UTC

[3/5] kafka git commit: KAFKA-2639: Refactoring of ZkUtils

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index b283e0a..e4e1e9c 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
 import kafka.controller.KafkaController
+import org.apache.kafka.common.security.JaasUtils
 
 /**
  * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -40,18 +41,18 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
   // create the election path in ZK, if one does not exist
   val index = electionPath.lastIndexOf("/")
   if (index > 0)
-    makeSurePersistentPathExists(controllerContext.zkClient, electionPath.substring(0, index))
+    controllerContext.zkUtils.makeSurePersistentPathExists(electionPath.substring(0, index))
   val leaderChangeListener = new LeaderChangeListener
 
   def startup {
     inLock(controllerContext.controllerLock) {
-      controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
+      controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
       elect
     }
   }
 
   private def getControllerID(): Int = {
-    readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+    controllerContext.zkUtils.readDataMaybeNull(electionPath)._1 match {
        case Some(controller) => KafkaController.parseControllerId(controller)
        case None => -1
     }
@@ -75,7 +76,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
     try {
       val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                                       electString,
-                                                      controllerContext.zkConnection.getZookeeper)
+                                                      controllerContext.zkUtils.zkConnection.getZookeeper,
+                                                      JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
       zkCheckedEphemeral.create()
       info(brokerId + " successfully elected as leader")
       leaderId = brokerId
@@ -105,7 +107,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
 
   def resign() = {
     leaderId = -1
-    deletePath(controllerContext.zkClient, electionPath)
+    controllerContext.zkUtils.deletePath(electionPath)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index c39fbfe..87d9fb7 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -25,6 +25,7 @@ import kafka.consumer.SimpleConsumer
 import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest}
 import kafka.common.{OffsetMetadataAndError, ErrorMapping, BrokerNotAvailableException, TopicAndPartition}
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.JaasUtils
 import scala.collection._
 import kafka.client.ClientUtils
 import kafka.network.BlockingChannel
@@ -37,9 +38,9 @@ object ConsumerOffsetChecker extends Logging {
   private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()
   private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()
 
-  private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
+  private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] = {
     try {
-      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1 match {
+      zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + bid)._1 match {
         case Some(brokerInfoString) =>
           Json.parseFull(brokerInfoString) match {
             case Some(m) =>
@@ -60,15 +61,15 @@ object ConsumerOffsetChecker extends Logging {
     }
   }
 
-  private def processPartition(zkClient: ZkClient,
+  private def processPartition(zkUtils: ZkUtils,
                                group: String, topic: String, pid: Int) {
     val topicPartition = TopicAndPartition(topic, pid)
     val offsetOpt = offsetMap.get(topicPartition)
     val groupDirs = new ZKGroupTopicDirs(group, topic)
-    val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/%s".format(pid))._1
-    ZkUtils.getLeaderForPartition(zkClient, topic, pid) match {
+    val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(pid))._1
+    zkUtils.getLeaderForPartition(topic, pid) match {
       case Some(bid) =>
-        val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid))
+        val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
         consumerOpt match {
           case Some(consumer) =>
             val topicAndPartition = TopicAndPartition(topic, pid)
@@ -86,11 +87,11 @@ object ConsumerOffsetChecker extends Logging {
     }
   }
 
-  private def processTopic(zkClient: ZkClient, group: String, topic: String) {
+  private def processTopic(zkUtils: ZkUtils, group: String, topic: String) {
     topicPidMap.get(topic) match {
       case Some(pids) =>
         pids.sorted.foreach {
-          pid => processPartition(zkClient, group, topic, pid)
+          pid => processPartition(zkUtils, group, topic, pid)
         }
       case None => // ignore
     }
@@ -148,19 +149,22 @@ object ConsumerOffsetChecker extends Logging {
 
     val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt)) else None
 
-    var zkClient: ZkClient = null
+    var zkUtils: ZkUtils = null
     var channel: BlockingChannel = null
     try {
-      zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
+      zkUtils = ZkUtils(zkConnect,
+                        30000,
+                        30000,
+                        JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
 
       val topicList = topics match {
         case Some(x) => x.split(",").view.toList
-        case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir +  "/owners").toList
+        case None => zkUtils.getChildren(groupDirs.consumerGroupDir +  "/owners").toList
       }
 
-      topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*)
+      topicPidMap = immutable.Map(zkUtils.getPartitionsForTopics(topicList).toSeq:_*)
       val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
-      val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
+      val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
 
       debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
       channel.send(OffsetFetchRequest(group, topicPartitions))
@@ -173,11 +177,11 @@ object ConsumerOffsetChecker extends Logging {
           // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
           // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
           try {
-            val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
+            val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
             offsetMap.put(topicAndPartition, offset)
           } catch {
             case z: ZkNoNodeException =>
-              if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
+              if(zkUtils.pathExists(topicDirs.consumerOffsetDir))
                 offsetMap.put(topicAndPartition,-1)
               else
                 throw z
@@ -193,7 +197,7 @@ object ConsumerOffsetChecker extends Logging {
 
       println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
       topicList.sorted.foreach {
-        topic => processTopic(zkClient, group, topic)
+        topic => processTopic(zkUtils, group, topic)
       }
 
       if (options.has("broker-info"))
@@ -216,8 +220,8 @@ object ConsumerOffsetChecker extends Logging {
           case None => // ignore
         }
       }
-      if (zkClient != null)
-        zkClient.close()
+      if (zkUtils != null)
+        zkUtils.close()
 
       if (channel != null)
         channel.disconnect()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 7b52fe4..75d4fd1 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -21,6 +21,7 @@ import java.io.FileWriter
 import joptsimple._
 import kafka.utils.{Logging, ZkUtils, ZKGroupTopicDirs, CommandLineUtils}
 import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.security.JaasUtils
 
 
 /**
@@ -72,16 +73,19 @@ object ExportZkOffsets extends Logging {
     val groups     = options.valuesOf(groupOpt)
     val outfile    = options.valueOf(outFileOpt)
 
-    var zkClient   : ZkClient    = null
+    var zkUtils   : ZkUtils    = null
     val fileWriter : FileWriter  = new FileWriter(outfile)
     
     try {
-      zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
+      zkUtils = ZkUtils(zkConnect,
+                        30000,
+                        30000,
+                        JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
       
       var consumerGroups: Seq[String] = null
 
       if (groups.size == 0) {
-        consumerGroups = ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath).toList
+        consumerGroups = zkUtils.getChildren(ZkUtils.ConsumersPath).toList
       }
       else {
         import scala.collection.JavaConversions._
@@ -89,15 +93,15 @@ object ExportZkOffsets extends Logging {
       }
       
       for (consumerGrp <- consumerGroups) {
-        val topicsList = getTopicsList(zkClient, consumerGrp)
+        val topicsList = getTopicsList(zkUtils, consumerGrp)
         
         for (topic <- topicsList) {
-          val bidPidList = getBrokeridPartition(zkClient, consumerGrp, topic)
+          val bidPidList = getBrokeridPartition(zkUtils, consumerGrp, topic)
           
           for (bidPid <- bidPidList) {
             val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
             val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
-            ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 match {
+            zkUtils.readDataMaybeNull(offsetPath)._1 match {
               case Some(offsetVal) =>
                 fileWriter.write(offsetPath + ":" + offsetVal + "\n")
                 debug(offsetPath + " => " + offsetVal)
@@ -114,10 +118,10 @@ object ExportZkOffsets extends Logging {
     }
   }
 
-  private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] =
-    ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList
+  private def getBrokeridPartition(zkUtils: ZkUtils, consumerGroup: String, topic: String): List[String] =
+    zkUtils.getChildrenParentMayNotExist("/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList
   
-  private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] =
-    ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList
+  private def getTopicsList(zkUtils: ZkUtils, consumerGroup: String): List[String] =
+    zkUtils.getChildren("/consumers/%s/offsets".format(consumerGroup)).toList
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index b56f587..38a71ae 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -22,6 +22,7 @@ import java.io.FileReader
 import joptsimple._
 import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
 import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.security.JaasUtils
 
 
 /**
@@ -68,10 +69,10 @@ object ImportZkOffsets extends Logging {
     val zkConnect           = options.valueOf(zkConnectOpt)
     val partitionOffsetFile = options.valueOf(inFileOpt)
 
-    val zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
+    val zkUtils = ZkUtils(zkConnect, 30000, 30000, JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
     val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile)
 
-    updateZkOffsets(zkClient, partitionOffsets)
+    updateZkOffsets(zkUtils, partitionOffsets)
   }
 
   private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = {
@@ -92,12 +93,12 @@ object ImportZkOffsets extends Logging {
     partOffsetsMap
   }
   
-  private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = {
+  private def updateZkOffsets(zkUtils: ZkUtils, partitionOffsets: Map[String,String]): Unit = {
     for ((partition, offset) <- partitionOffsets) {
       debug("updating [" + partition + "] with offset [" + offset + "]")
       
       try {
-        ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
+        zkUtils.updatePersistentPath(partition, offset.toString)
       } catch {
         case e: Throwable => e.printStackTrace()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 9942686..95dd2a6 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -23,6 +23,7 @@ import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.{TopicAndPartition, KafkaException}
 import kafka.utils.{ZKGroupTopicDirs, ZkUtils, CoreUtils}
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
 
 /**
@@ -36,17 +37,17 @@ object UpdateOffsetsInZK {
     if(args.length < 3)
       usage
     val config = new ConsumerConfig(Utils.loadProps(args(1)))
-    val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs,
-        config.zkConnectionTimeoutMs)
+    val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs,
+        config.zkConnectionTimeoutMs, JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
     args(0) match {
-      case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
-      case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))
+      case Earliest => getAndSetOffsets(zkUtils, OffsetRequest.EarliestTime, config, args(2))
+      case Latest => getAndSetOffsets(zkUtils, OffsetRequest.LatestTime, config, args(2))
       case _ => usage
     }
   }
 
-  private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
-    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic))
+  private def getAndSetOffsets(zkUtils: ZkUtils, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
+    val partitionsPerTopicMap = zkUtils.getPartitionsForTopics(List(topic))
     var partitions: Seq[Int] = Nil
 
     partitionsPerTopicMap.get(topic) match {
@@ -56,7 +57,7 @@ object UpdateOffsetsInZK {
 
     var numParts = 0
     for (partition <- partitions) {
-      val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+      val brokerHostingPartition = zkUtils.getLeaderForPartition(topic, partition)
 
       val broker = brokerHostingPartition match {
         case Some(b) => b
@@ -64,7 +65,7 @@ object UpdateOffsetsInZK {
           "getOffsetsBefore request")
       }
 
-      ZkUtils.getBrokerInfo(zkClient, broker) match {
+      zkUtils.getBrokerInfo(broker) match {
         case Some(brokerInfo) =>
           val consumer = new SimpleConsumer(brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host,
                                             brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port,
@@ -75,7 +76,7 @@ object UpdateOffsetsInZK {
           val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
 
           println("updating partition " + partition + " with new offset: " + offset)
-          ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
+          zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
           numParts += 1
         case None => throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker))
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index db2721f..5a505c6 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -19,6 +19,8 @@ package kafka.tools
 
 import joptsimple.OptionParser
 import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.security._
+
 import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, CommandLineUtils}
 
 object VerifyConsumerRebalance extends Logging {
@@ -46,15 +48,18 @@ object VerifyConsumerRebalance extends Logging {
     val zkConnect = options.valueOf(zkConnectOpt)
     val group = options.valueOf(groupOpt)
 
-    var zkClient: ZkClient = null
+    var zkUtils: ZkUtils = null
     try {
-      zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
+      zkUtils = ZkUtils(zkConnect,
+                        30000,
+                        30000, 
+                        JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
 
       debug("zkConnect = %s; group = %s".format(zkConnect, group))
 
       // check if the rebalancing operation succeeded.
       try {
-        if(validateRebalancingOperation(zkClient, group))
+        if(validateRebalancingOperation(zkUtils, group))
           println("Rebalance operation successful !")
         else
           println("Rebalance operation failed !")
@@ -63,12 +68,12 @@ object VerifyConsumerRebalance extends Logging {
       }
     }
     finally {
-      if (zkClient != null)
-        zkClient.close()
+      if (zkUtils != null)
+        zkUtils.close()
     }
   }
 
-  private def validateRebalancingOperation(zkClient: ZkClient, group: String): Boolean = {
+  private def validateRebalancingOperation(zkUtils: ZkUtils, group: String): Boolean = {
     info("Verifying rebalancing operation for consumer group " + group)
     var rebalanceSucceeded: Boolean = true
     /**
@@ -76,14 +81,14 @@ object VerifyConsumerRebalance extends Logging {
      * This means that for each partition registered under /brokers/topics/[topic]/[broker-id], an owner exists
      * under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id]
      */
-    val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics = false)
-    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq)
+    val consumersPerTopicMap = zkUtils.getConsumersPerTopic(group, excludeInternalTopics = false)
+    val partitionsPerTopicMap = zkUtils.getPartitionsForTopics(consumersPerTopicMap.keySet.toSeq)
 
     partitionsPerTopicMap.foreach { case (topic, partitions) =>
       val topicDirs = new ZKGroupTopicDirs(group, topic)
       info("Alive partitions for topic %s are %s ".format(topic, partitions.toString))
       info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic)))
-      val partitionsWithOwners = ZkUtils.getChildrenParentMayNotExist(zkClient, topicDirs.consumerOwnerDir)
+      val partitionsWithOwners = zkUtils.getChildrenParentMayNotExist(topicDirs.consumerOwnerDir)
       if(partitionsWithOwners.size == 0) {
         error("No owners for any partitions for topic " + topic)
         rebalanceSucceeded = false
@@ -100,7 +105,7 @@ object VerifyConsumerRebalance extends Logging {
         }
         // try reading the partition owner path for see if a valid consumer id exists there
         val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
-        val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1 match {
+        val partitionOwner = zkUtils.readDataMaybeNull(partitionOwnerPath)._1 match {
           case Some(m) => m
           case None => null
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index d99629a..4074c0f 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -20,6 +20,7 @@ package kafka.utils
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
 import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
+import kafka.utils.ZkUtils._
 import org.I0Itec.zkclient.ZkClient
 import org.apache.zookeeper.data.Stat
 
@@ -29,26 +30,26 @@ object ReplicationUtils extends Logging {
 
   private val IsrChangeNotificationPrefix = "isr_change_"
 
-  def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
+  def updateLeaderAndIsr(zkUtils: ZkUtils, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
     zkVersion: Int): (Boolean,Int) = {
     debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(",")))
-    val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId)
-    val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
+    val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId)
+    val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
     // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
-    val updatePersistentPath: (Boolean, Int) = ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
+    val updatePersistentPath: (Boolean, Int) = zkUtils.conditionalUpdatePersistentPath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
     updatePersistentPath
   }
 
-  def propagateIsrChanges(zkClient: ZkClient, isrChangeSet: Set[TopicAndPartition]): Unit = {
-    val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath(
-      zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
+  def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicAndPartition]): Unit = {
+    val isrChangeNotificationPath: String = zkUtils.createSequentialPersistentPath(
+      ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
       generateIsrChangeJson(isrChangeSet))
     debug("Added " + isrChangeNotificationPath + " for " + isrChangeSet)
   }
 
-  def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = {
+  def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = {
     try {
-      val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, path)
+      val writtenLeaderAndIsrInfo = zkUtils.readDataMaybeNull(path)
       val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
       val writtenStat = writtenLeaderAndIsrInfo._2
       val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat)
@@ -69,9 +70,9 @@ object ReplicationUtils extends Logging {
     (false,-1)
   }
 
-  def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = {
-    val leaderAndIsrPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition)
-    val (leaderAndIsrOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, leaderAndIsrPath)
+  def getLeaderIsrAndEpochForPartition(zkUtils: ZkUtils, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = {
+    val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
+    val (leaderAndIsrOpt, stat) = zkUtils.readDataMaybeNull(leaderAndIsrPath)
     leaderAndIsrOpt.flatMap(leaderAndIsrStr => parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index e1cfa2e..17e63e2 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -17,7 +17,12 @@
 
 package kafka.utils
 
+import java.io.File
+import java.net.URI
+import java.security.URIParameter
+import javax.security.auth.login.Configuration
 import java.util.concurrent.CountDownLatch
+
 import kafka.cluster._
 import kafka.consumer.{ConsumerThreadId, TopicCount}
 import kafka.server.ConfigType
@@ -27,15 +32,19 @@ import org.I0Itec.zkclient.exception.{ZkException, ZkNodeExistsException, ZkNoNo
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.protocol.SecurityProtocol
-import collection._
+
+import org.apache.zookeeper.ZooDefs
+import scala.collection.JavaConverters._
+import scala.collection._
 import kafka.api.LeaderAndIsr
-import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.data.{ACL, Stat}
 import kafka.admin._
 import kafka.common.{KafkaException, NoEpochForPartitionException}
 import kafka.controller.ReassignedPartitionsContext
 import kafka.controller.KafkaController
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
+import kafka.utils.ZkUtils._
 
 import org.apache.zookeeper.AsyncCallback.{DataCallback,StringCallback}
 import org.apache.zookeeper.CreateMode
@@ -44,8 +53,7 @@ import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.ZooDefs.Ids
 import org.apache.zookeeper.ZooKeeper
 
-
-object ZkUtils extends Logging {
+object ZkUtils {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
@@ -58,17 +66,51 @@ object ZkUtils extends Logging {
   val IsrChangeNotificationPath = "/isr_change_notification"
   val EntityConfigPath = "/config"
   val EntityConfigChangesPath = "/config/changes"
-  // These are persistent ZK paths that should exist on kafka broker startup.
-  val persistentZkPaths = Seq(ConsumersPath,
-                              BrokerIdsPath,
-                              BrokerTopicsPath,
-                              EntityConfigChangesPath,
-                              ZkUtils.getEntityConfigRootPath(ConfigType.Topic),
-                              ZkUtils.getEntityConfigRootPath(ConfigType.Client),
-                              DeleteTopicsPath,
-                              BrokerSequenceIdPath,
-                              IsrChangeNotificationPath)
+  
+  def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
+    val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
+    new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled)
+  }
+  
+  /*
+   * Used in tests
+   */
+  def apply(zkClient: ZkClient, isZkSecurityEnabled: Boolean): ZkUtils = {
+    new ZkUtils(zkClient, null, isZkSecurityEnabled)
+  }
+
+  def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = {
+    val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer)
+    zkClient
+  }
 
+  def createZkClientAndConnection(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): (ZkClient, ZkConnection) = {
+    val zkConnection = new ZkConnection(zkUrl, sessionTimeout)
+    val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer)
+    (zkClient, zkConnection)
+  }
+  
+  def DefaultAcls(isSecure: Boolean): java.util.List[ACL] = if (isSecure) {
+    val list = ZooDefs.Ids.CREATOR_ALL_ACL
+    list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
+    list
+  } else {
+    ZooDefs.Ids.OPEN_ACL_UNSAFE
+  }
+   
+  def maybeDeletePath(zkUrl: String, dir: String) {
+    try {
+      val zk = createZkClient(zkUrl, 30*1000, 30*1000)
+      zk.deleteRecursive(dir)
+      zk.close()
+    } catch {
+      case _: Throwable => // swallow
+    }
+  }
+  
+  /*
+   * Get calls that only depend on static paths
+   */
   def getTopicPath(topic: String): String = {
     BrokerTopicsPath + "/" + topic
   }
@@ -77,6 +119,12 @@ object ZkUtils extends Logging {
     getTopicPath(topic) + "/partitions"
   }
 
+  def getTopicPartitionPath(topic: String, partitionId: Int): String =
+    getTopicPartitionsPath(topic) + "/" + partitionId
+
+  def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
+    getTopicPartitionPath(topic, partitionId) + "/" + "state"
+    
   def getEntityConfigRootPath(entityType: String): String =
     EntityConfigPath + "/" + entityType
 
@@ -85,43 +133,54 @@ object ZkUtils extends Logging {
 
   def getDeleteTopicPath(topic: String): String =
     DeleteTopicsPath + "/" + topic
+}
+
+class ZkUtils(val zkClient: ZkClient, 
+              val zkConnection: ZkConnection,
+              val isSecure: Boolean) extends Logging {
+  // These are persistent ZK paths that should exist on kafka broker startup.
+  val persistentZkPaths = Seq(ConsumersPath,
+                              BrokerIdsPath,
+                              BrokerTopicsPath,
+                              EntityConfigChangesPath,
+                              getEntityConfigRootPath(ConfigType.Topic),
+                              getEntityConfigRootPath(ConfigType.Client),
+                              DeleteTopicsPath,
+                              BrokerSequenceIdPath,
+                              IsrChangeNotificationPath)
 
-  def getController(zkClient: ZkClient): Int = {
-    readDataMaybeNull(zkClient, ControllerPath)._1 match {
+  val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
+  
+  def getController(): Int = {
+    readDataMaybeNull(ControllerPath)._1 match {
       case Some(controller) => KafkaController.parseControllerId(controller)
       case None => throw new KafkaException("Controller doesn't exist")
     }
   }
 
-  def getTopicPartitionPath(topic: String, partitionId: Int): String =
-    getTopicPartitionsPath(topic) + "/" + partitionId
-
-  def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
-    getTopicPartitionPath(topic, partitionId) + "/" + "state"
-
-  def getSortedBrokerList(zkClient: ZkClient): Seq[Int] =
-    ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).sorted
+  def getSortedBrokerList(): Seq[Int] =
+    getChildren(BrokerIdsPath).map(_.toInt).sorted
 
-  def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
-    val brokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).sorted
-    brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+  def getAllBrokersInCluster(): Seq[Broker] = {
+    val brokerIds = getChildrenParentMayNotExist(BrokerIdsPath).sorted
+    brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get)
   }
 
-  def getAllBrokerEndPointsForChannel(zkClient: ZkClient, protocolType: SecurityProtocol): Seq[BrokerEndPoint] = {
-    getAllBrokersInCluster(zkClient).map(_.getBrokerEndPoint(protocolType))
+  def getAllBrokerEndPointsForChannel(protocolType: SecurityProtocol): Seq[BrokerEndPoint] = {
+    getAllBrokersInCluster().map(_.getBrokerEndPoint(protocolType))
   }
 
-  def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
-    ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
+  def getLeaderAndIsrForPartition(topic: String, partition: Int):Option[LeaderAndIsr] = {
+    ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topic, partition).map(_.leaderAndIsr)
   }
 
-  def setupCommonPaths(zkClient: ZkClient) {
+  def setupCommonPaths() {
     for(path <- persistentZkPaths)
-      makeSurePersistentPathExists(zkClient, path)
+      makeSurePersistentPathExists(path)
   }
 
-  def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
-    val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1
+  def getLeaderForPartition(topic: String, partition: Int): Option[Int] = {
+    val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
@@ -138,8 +197,8 @@ object ZkUtils extends Logging {
    * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
    * other broker will retry becoming leader with the same new epoch value.
    */
-  def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = {
-    val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1
+  def getEpochForPartition(topic: String, partition: Int): Int = {
+    val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
@@ -155,15 +214,15 @@ object ZkUtils extends Logging {
     * users can provide brokerId in the config , inorder to avoid conflicts between zk generated
     * seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId.
     */
-  def getBrokerSequenceId(zkClient: ZkClient, MaxReservedBrokerId: Int): Int = {
-    getSequenceId(zkClient, BrokerSequenceIdPath) + MaxReservedBrokerId
+  def getBrokerSequenceId(MaxReservedBrokerId: Int): Int = {
+    getSequenceId(BrokerSequenceIdPath) + MaxReservedBrokerId
   }
 
   /**
    * Gets the in-sync replicas (ISR) for a specific topic and partition
    */
-  def getInSyncReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
-    val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1
+  def getInSyncReplicasForPartition(topic: String, partition: Int): Seq[Int] = {
+    val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
@@ -177,8 +236,8 @@ object ZkUtils extends Logging {
   /**
    * Gets the assigned replicas (AR) for a specific topic and partition
    */
-  def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
-    val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+  def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = {
+    val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
     jsonPartitionMapOpt match {
       case Some(jsonPartitionMap) =>
         Json.parseFull(jsonPartitionMap) match {
@@ -204,21 +263,22 @@ object ZkUtils extends Logging {
    * @param timeout
    * @param jmxPort
    */
-  def registerBrokerInZk(zkClient: ZkClient, zkConnection: ZkConnection, id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], jmxPort: Int) {
-    val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
+  def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], jmxPort: Int) {
+    val brokerIdPath = BrokerIdsPath + "/" + id
     val timestamp = SystemTime.milliseconds.toString
 
     val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
-    registerBrokerInZk(zkClient, zkConnection, brokerIdPath, brokerInfo)
+    registerBrokerInZk(brokerIdPath, brokerInfo)
 
     info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
   }
 
-  private def registerBrokerInZk(zkClient: ZkClient, zkConnection: ZkConnection, brokerIdPath: String, brokerInfo: String) {
+  private def registerBrokerInZk(brokerIdPath: String, brokerInfo: String) {
     try {
       val zkCheckedEphemeral = new ZKCheckedEphemeral(brokerIdPath,
                                                       brokerInfo,
-                                                      zkConnection.getZookeeper)
+                                                      zkConnection.getZookeeper,
+                                                      isSecure)
       zkCheckedEphemeral.create()
     } catch {
       case e: ZkNodeExistsException =>
@@ -250,31 +310,36 @@ object ZkUtils extends Logging {
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
-  def makeSurePersistentPathExists(client: ZkClient, path: String) {
-    if (!client.exists(path))
-      ZkPath.createPersistent(client, path, true) //won't throw NoNodeException or NodeExistsException
+  def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = DefaultAcls) {
+    //Consumer path is kept open as different consumers will write under this node.
+    val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) {
+      ZooDefs.Ids.OPEN_ACL_UNSAFE
+    } else acls
+
+    if (!zkClient.exists(path))
+      ZkPath.createPersistent(zkClient, path, true, acl) //won't throw NoNodeException or NodeExistsException
   }
 
   /**
    *  create the parent path
    */
-  private def createParentPath(client: ZkClient, path: String): Unit = {
+  private def createParentPath(path: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
     val parentDir = path.substring(0, path.lastIndexOf('/'))
     if (parentDir.length != 0) {
-      ZkPath.createPersistent(client, parentDir, true)
+      ZkPath.createPersistent(zkClient, parentDir, true, acls)
     }
   }
 
   /**
    * Create an ephemeral node with the given path and data. Create parents if necessary.
    */
-  private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
+  private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
     try {
-      ZkPath.createEphemeral(client, path, data)
+      ZkPath.createEphemeral(zkClient, path, data, acls)
     } catch {
       case e: ZkNoNodeException => {
-        createParentPath(client, path)
-        ZkPath.createEphemeral(client, path, data)
+        createParentPath(path)
+        ZkPath.createEphemeral(zkClient, path, data, acls)
       }
     }
   }
@@ -283,15 +348,15 @@ object ZkUtils extends Logging {
    * Create an ephemeral node with the given path and data.
    * Throw NodeExistException if node already exists.
    */
-  def createEphemeralPathExpectConflict(client: ZkClient, path: String, data: String): Unit = {
+  def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
     try {
-      createEphemeralPath(client, path, data)
+      createEphemeralPath(path, data, acls)
     } catch {
       case e: ZkNodeExistsException => {
         // this can happen when there is connection loss; make sure the data is what we intend to write
         var storedData: String = null
         try {
-          storedData = readData(client, path)._1
+          storedData = readData(path)._1
         } catch {
           case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
           case e2: Throwable => throw e2
@@ -311,19 +376,19 @@ object ZkUtils extends Logging {
   /**
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
-  def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
+  def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = DefaultAcls): Unit = {
     try {
-      ZkPath.createPersistent(client, path, data)
+      ZkPath.createPersistent(zkClient, path, data, acls)
     } catch {
       case e: ZkNoNodeException => {
-        createParentPath(client, path)
-        ZkPath.createPersistent(client, path, data)
+        createParentPath(path)
+        ZkPath.createPersistent(zkClient, path, data, acls)
       }
     }
   }
 
-  def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
-    ZkPath.createPersistentSequential(client, path, data)
+  def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = DefaultAcls): String = {
+    ZkPath.createPersistentSequential(zkClient, path, data, acls)
   }
 
   /**
@@ -331,17 +396,17 @@ object ZkUtils extends Logging {
    * create parrent directory if necessary. Never throw NodeExistException.
    * Return the updated path zkVersion
    */
-  def updatePersistentPath(client: ZkClient, path: String, data: String) = {
+  def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls) = {
     try {
-      client.writeData(path, data)
+      zkClient.writeData(path, data)
     } catch {
       case e: ZkNoNodeException => {
-        createParentPath(client, path)
+        createParentPath(path)
         try {
-          ZkPath.createPersistent(client, path, data)
+          ZkPath.createPersistent(zkClient, path, data, acls)
         } catch {
           case e: ZkNodeExistsException =>
-            client.writeData(path, data)
+            zkClient.writeData(path, data)
           case e2: Throwable => throw e2
         }
       }
@@ -357,17 +422,17 @@ object ZkUtils extends Logging {
    * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one).
    * In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded.
    */
-  def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int,
-    optionalChecker:Option[(ZkClient, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
+  def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int,
+    optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
     try {
-      val stat = client.writeDataReturnStat(path, data, expectVersion)
+      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
       debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
     } catch {
       case e1: ZkBadVersionException =>
         optionalChecker match {
-          case Some(checker) => return checker(client, path, data)
+          case Some(checker) => return checker(this, path, data)
           case _ => debug("Checker method is not passed skipping zkData match")
         }
         warn("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
@@ -384,9 +449,9 @@ object ZkUtils extends Logging {
    * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current
    * version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException
    */
-  def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
+  def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = {
     try {
-      val stat = client.writeDataReturnStat(path, data, expectVersion)
+      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
       debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
@@ -403,21 +468,21 @@ object ZkUtils extends Logging {
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
    */
-  def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
+  def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
     try {
-      client.writeData(path, data)
+      zkClient.writeData(path, data)
     } catch {
       case e: ZkNoNodeException => {
-        createParentPath(client, path)
-        ZkPath.createEphemeral(client, path, data)
+        createParentPath(path)
+        ZkPath.createEphemeral(zkClient, path, data, acls)
       }
       case e2: Throwable => throw e2
     }
   }
 
-  def deletePath(client: ZkClient, path: String): Boolean = {
+  def deletePath(path: String): Boolean = {
     try {
-      client.delete(path)
+      zkClient.delete(path)
     } catch {
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
@@ -427,9 +492,9 @@ object ZkUtils extends Logging {
     }
   }
 
-  def deletePathRecursive(client: ZkClient, path: String) {
+  def deletePathRecursive(path: String) {
     try {
-      client.deleteRecursive(path)
+      zkClient.deleteRecursive(path)
     } catch {
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
@@ -438,26 +503,16 @@ object ZkUtils extends Logging {
     }
   }
 
-  def maybeDeletePath(zkUrl: String, dir: String) {
-    try {
-      val zk = createZkClient(zkUrl, 30*1000, 30*1000)
-      zk.deleteRecursive(dir)
-      zk.close()
-    } catch {
-      case _: Throwable => // swallow
-    }
-  }
-
-  def readData(client: ZkClient, path: String): (String, Stat) = {
+  def readData(path: String): (String, Stat) = {
     val stat: Stat = new Stat()
-    val dataStr: String = client.readData(path, stat)
+    val dataStr: String = zkClient.readData(path, stat)
     (dataStr, stat)
   }
 
-  def readDataMaybeNull(client: ZkClient, path: String): (Option[String], Stat) = {
+  def readDataMaybeNull(path: String): (Option[String], Stat) = {
     val stat: Stat = new Stat()
     val dataAndStat = try {
-                        (Some(client.readData(path, stat)), stat)
+                        (Some(zkClient.readData(path, stat)), stat)
                       } catch {
                         case e: ZkNoNodeException =>
                           (None, stat)
@@ -466,17 +521,17 @@ object ZkUtils extends Logging {
     dataAndStat
   }
 
-  def getChildren(client: ZkClient, path: String): Seq[String] = {
+  def getChildren(path: String): Seq[String] = {
     import scala.collection.JavaConversions._
     // triggers implicit conversion from java list to scala Seq
-    client.getChildren(path)
+    zkClient.getChildren(path)
   }
 
-  def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = {
+  def getChildrenParentMayNotExist(path: String): Seq[String] = {
     import scala.collection.JavaConversions._
     // triggers implicit conversion from java list to scala Seq
     try {
-      client.getChildren(path)
+      zkClient.getChildren(path)
     } catch {
       case e: ZkNoNodeException => Nil
       case e2: Throwable => throw e2
@@ -486,15 +541,15 @@ object ZkUtils extends Logging {
   /**
    * Check if the given path exists
    */
-  def pathExists(client: ZkClient, path: String): Boolean = {
-    client.exists(path)
+  def pathExists(path: String): Boolean = {
+    zkClient.exists(path)
   }
 
-  def getCluster(zkClient: ZkClient) : Cluster = {
+  def getCluster() : Cluster = {
     val cluster = new Cluster
-    val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
+    val nodes = getChildrenParentMayNotExist(BrokerIdsPath)
     for (node <- nodes) {
-      val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1
+      val brokerZKString = readData(BrokerIdsPath + "/" + node)._1
       cluster.add(Broker.createBroker(node.toInt, brokerZKString))
     }
     cluster
@@ -504,7 +559,7 @@ object ZkUtils extends Logging {
   : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     for(topicAndPartition <- topicAndPartitions) {
-      ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) match {
+      ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition) match {
         case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
         case None =>
       }
@@ -512,10 +567,10 @@ object ZkUtils extends Logging {
     ret
   }
 
-  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
+  def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
     val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
     topics.foreach { topic =>
-      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+      val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
       jsonPartitionMapOpt match {
         case Some(jsonPartitionMap) =>
           Json.parseFull(jsonPartitionMap) match {
@@ -536,10 +591,10 @@ object ZkUtils extends Logging {
     ret
   }
 
-  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
+  def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
     val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
     topics.foreach{ topic =>
-      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+      val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
       val partitionMap = jsonPartitionMapOpt match {
         case Some(jsonPartitionMap) =>
           Json.parseFull(jsonPartitionMap) match {
@@ -559,8 +614,8 @@ object ZkUtils extends Logging {
     ret
   }
 
-  def getPartitionsForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
-    getPartitionAssignmentForTopics(zkClient, topics).map { topicAndPartitionMap =>
+  def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
+    getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
       val topic = topicAndPartitionMap._1
       val partitionMap = topicAndPartitionMap._2
       debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
@@ -568,9 +623,9 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getPartitionsBeingReassigned(zkClient: ZkClient): Map[TopicAndPartition, ReassignedPartitionsContext] = {
+  def getPartitionsBeingReassigned(): Map[TopicAndPartition, ReassignedPartitionsContext] = {
     // read the partitions and their new replica list
-    val jsonPartitionMapOpt = readDataMaybeNull(zkClient, ReassignPartitionsPath)._1
+    val jsonPartitionMapOpt = readDataMaybeNull(ReassignPartitionsPath)._1
     jsonPartitionMapOpt match {
       case Some(jsonPartitionMap) =>
         val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
@@ -626,53 +681,53 @@ object ZkUtils extends Logging {
                                                                                           "replicas" -> e._2))))
   }
 
-  def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
-    val zkPath = ZkUtils.ReassignPartitionsPath
+  def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
+    val zkPath = ReassignPartitionsPath
     partitionsToBeReassigned.size match {
       case 0 => // need to delete the /admin/reassign_partitions path
-        deletePath(zkClient, zkPath)
+        deletePath(zkPath)
         info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
       case _ =>
         val jsonData = getPartitionReassignmentZkData(partitionsToBeReassigned)
         try {
-          updatePersistentPath(zkClient, zkPath, jsonData)
+          updatePersistentPath(zkPath, jsonData)
           debug("Updated partition reassignment path with %s".format(jsonData))
         } catch {
           case nne: ZkNoNodeException =>
-            ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+            createPersistentPath(zkPath, jsonData)
             debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
           case e2: Throwable => throw new AdminOperationException(e2.toString)
         }
     }
   }
 
-  def getPartitionsUndergoingPreferredReplicaElection(zkClient: ZkClient): Set[TopicAndPartition] = {
+  def getPartitionsUndergoingPreferredReplicaElection(): Set[TopicAndPartition] = {
     // read the partitions and their new replica list
-    val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1
+    val jsonPartitionListOpt = readDataMaybeNull(PreferredReplicaLeaderElectionPath)._1
     jsonPartitionListOpt match {
       case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList)
       case None => Set.empty[TopicAndPartition]
     }
   }
 
-  def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
+  def deletePartition(brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
     zkClient.delete(brokerIdPath)
     val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
     zkClient.delete(brokerPartTopicPath)
   }
 
-  def getConsumersInGroup(zkClient: ZkClient, group: String): Seq[String] = {
+  def getConsumersInGroup(group: String): Seq[String] = {
     val dirs = new ZKGroupDirs(group)
-    getChildren(zkClient, dirs.consumerRegistryDir)
+    getChildren(dirs.consumerRegistryDir)
   }
 
-  def getConsumersPerTopic(zkClient: ZkClient, group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {
+  def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {
     val dirs = new ZKGroupDirs(group)
-    val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
+    val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
     val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
     for (consumer <- consumers) {
-      val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient, excludeInternalTopics)
+      val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics)
       for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
         for (consumerThreadId <- consumerThreadIdSet)
           consumersPerTopicMap.get(topic) match {
@@ -693,8 +748,8 @@ object ZkUtils extends Logging {
    * @param zkClient The zookeeper client connection
    * @return An optional Broker object encapsulating the broker metadata
    */
-  def getBrokerInfo(zkClient: ZkClient, brokerId: Int): Option[Broker] = {
-    ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
+  def getBrokerInfo(brokerId: Int): Option[Broker] = {
+    readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match {
       case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo))
       case None => None
     }
@@ -705,27 +760,28 @@ object ZkUtils extends Logging {
     * It uses the stat returned by the zookeeper and return the version. Every time
     * client updates the path stat.version gets incremented
     */
-  def getSequenceId(client: ZkClient, path: String): Int = {
+  def getSequenceId(path: String, acls: java.util.List[ACL] = DefaultAcls): Int = {
     try {
-      val stat = client.writeDataReturnStat(path, "", -1)
+      val stat = zkClient.writeDataReturnStat(path, "", -1)
       stat.getVersion
     } catch {
       case e: ZkNoNodeException => {
-        createParentPath(client, BrokerSequenceIdPath)
+        createParentPath(BrokerSequenceIdPath, acls)
         try {
-          client.createPersistent(BrokerSequenceIdPath, "")
+          import scala.collection.JavaConversions._
+          zkClient.createPersistent(BrokerSequenceIdPath, "", acls)
           0
         } catch {
           case e: ZkNodeExistsException =>
-            val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
+            val stat = zkClient.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
             stat.getVersion
         }
       }
     }
   }
 
-  def getAllTopics(zkClient: ZkClient): Seq[String] = {
-    val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+  def getAllTopics(): Seq[String] = {
+    val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
     if(topics == null)
       Seq.empty[String]
     else
@@ -735,53 +791,48 @@ object ZkUtils extends Logging {
   /**
    * Returns all the entities whose configs have been overridden.
    */
-  def getAllEntitiesWithConfig(zkClient: ZkClient, entityType: String): Seq[String] = {
-    val entities = ZkUtils.getChildrenParentMayNotExist(zkClient, getEntityConfigRootPath(entityType))
+  def getAllEntitiesWithConfig(entityType: String): Seq[String] = {
+    val entities = getChildrenParentMayNotExist(getEntityConfigRootPath(entityType))
     if(entities == null)
       Seq.empty[String]
     else
       entities
   }
 
-  def getAllPartitions(zkClient: ZkClient): Set[TopicAndPartition] = {
-    val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+  def getAllPartitions(): Set[TopicAndPartition] = {
+    val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
     if(topics == null) Set.empty[TopicAndPartition]
     else {
       topics.map { topic =>
-        getChildren(zkClient, getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic, _))
+        getChildren(getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic, _))
       }.flatten.toSet
     }
   }
 
-  def getConsumerGroups(zkClient: ZkClient) = {
-    ZkUtils.getChildren(zkClient, ConsumersPath)
+  def getConsumerGroups() = {
+    getChildren(ConsumersPath)
   }
 
-  def getTopicsByConsumerGroup(zkClient: ZkClient,consumerGroup:String) = {
-    ZkUtils.getChildrenParentMayNotExist(zkClient, new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir)
+  def getTopicsByConsumerGroup(consumerGroup:String) = {
+    getChildrenParentMayNotExist(new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir)
   }
 
-  def getAllConsumerGroupsForTopic(zkClient: ZkClient, topic: String): Set[String] = {
-    val groups = ZkUtils.getChildrenParentMayNotExist(zkClient, ConsumersPath)
+  def getAllConsumerGroupsForTopic(topic: String): Set[String] = {
+    val groups = getChildrenParentMayNotExist(ConsumersPath)
     if (groups == null) Set.empty
     else {
       groups.foldLeft(Set.empty[String]) {(consumerGroupsForTopic, group) =>
-        val topics = getChildren(zkClient, new ZKGroupDirs(group).consumerGroupOffsetsDir)
+        val topics = getChildren(new ZKGroupDirs(group).consumerGroupOffsetsDir)
         if (topics.contains(topic)) consumerGroupsForTopic + group
         else consumerGroupsForTopic
       }
     }
   }
-
-  def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = {
-    val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer)
-    zkClient
-  }
-
-  def createZkClientAndConnection(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): (ZkClient, ZkConnection) = {
-    val zkConnection = new ZkConnection(zkUrl, sessionTimeout)
-    val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer)
-    (zkClient, zkConnection)
+  
+  def close() {
+    if(zkClient != null) {
+      zkClient.close()
+    }
   }
 }
 
@@ -800,7 +851,7 @@ private object ZKStringSerializer extends ZkSerializer {
 }
 
 class ZKGroupDirs(val group: String) {
-  def consumerDir = ZkUtils.ConsumersPath
+  def consumerDir = ConsumersPath
   def consumerGroupDir = consumerDir + "/" + group
   def consumerRegistryDir = consumerGroupDir + "/ids"
   def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"
@@ -829,6 +880,7 @@ class ZKConfig(props: VerifiableProperties) {
 
 object ZkPath {
   @volatile private var isNamespacePresent: Boolean = false
+  import scala.collection.JavaConversions._
 
   def checkNamespace(client: ZkClient) {
     if(isNamespacePresent)
@@ -844,24 +896,24 @@ object ZkPath {
     isNamespacePresent = false
   }
 
-  def createPersistent(client: ZkClient, path: String, data: Object) {
+  def createPersistent(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) {
     checkNamespace(client)
-    client.createPersistent(path, data)
+    client.createPersistent(path, data, acls)
   }
 
-  def createPersistent(client: ZkClient, path: String, createParents: Boolean) {
+  def createPersistent(client: ZkClient, path: String, createParents: Boolean, acls: java.util.List[ACL]) {
     checkNamespace(client)
-    client.createPersistent(path, createParents)
+    client.createPersistent(path, createParents, acls)
   }
 
-  def createEphemeral(client: ZkClient, path: String, data: Object) {
+  def createEphemeral(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) {
     checkNamespace(client)
-    client.createEphemeral(path, data)
+    client.createEphemeral(path, data, acls)
   }
 
-  def createPersistentSequential(client: ZkClient, path: String, data: Object): String = {
+  def createPersistentSequential(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]): String = {
     checkNamespace(client)
-    client.createPersistentSequential(path, data)
+    client.createPersistentSequential(path, data, acls)
   }
 }
 
@@ -876,7 +928,8 @@ object ZkPath {
 
 class ZKCheckedEphemeral(path: String,
                          data: String,
-                         zkHandle: ZooKeeper) extends Logging {
+                         zkHandle: ZooKeeper,
+                         isSecure: Boolean) extends Logging {
   private val createCallback = new CreateCallback
   private val getDataCallback = new GetDataCallback
   val latch: CountDownLatch = new CountDownLatch(1)
@@ -936,7 +989,7 @@ class ZKCheckedEphemeral(path: String,
   private def createEphemeral() {
     zkHandle.create(path,
                     ZKStringSerializer.serialize(data),
-                    Ids.OPEN_ACL_UNSAFE,
+                    DefaultAcls(isSecure),
                     CreateMode.EPHEMERAL,
                     createCallback,
                     null)
@@ -949,7 +1002,7 @@ class ZKCheckedEphemeral(path: String,
     } else {
       zkHandle.create(prefix,
                       new Array[Byte](0),
-                      Ids.OPEN_ACL_UNSAFE,
+                      ZkUtils. DefaultAcls(isSecure),
                       CreateMode.PERSISTENT,
                       new StringCallback() {
                         def processResult(rc : Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 16d7c26..db610c1 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -61,7 +61,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     super.setUp()
 
     // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic, 1, serverCount, this.servers)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index a64c2f3..d973d9a 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -63,7 +63,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     super.setUp()
 
     // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(this.zkClient, topic, 2, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
   }
 
   @Test
@@ -147,7 +147,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   @Test
   def testAutoCommitOnRebalance() {
     val topic2 = "topic2"
-    TestUtils.createTopic(this.zkClient, topic2, 2, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
 
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
@@ -186,17 +186,17 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     sendRecords(numRecords)
 
     val topic1: String = "tblablac" // matches subscribed pattern
-    TestUtils.createTopic(this.zkClient, topic1, 2, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
     sendRecords(1000, new TopicPartition(topic1, 0))
     sendRecords(1000, new TopicPartition(topic1, 1))
 
     val topic2: String = "tblablak" // does not match subscribed pattern
-    TestUtils.createTopic(this.zkClient, topic2, 2, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
     sendRecords(1000, new TopicPartition(topic2, 0))
     sendRecords(1000, new TopicPartition(topic2, 1))
 
     val topic3: String = "tblab1" // does not match subscribed pattern
-    TestUtils.createTopic(this.zkClient, topic3, 2, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic3, 2, serverCount, this.servers)
     sendRecords(1000, new TopicPartition(topic3, 0))
     sendRecords(1000, new TopicPartition(topic3, 1))
 
@@ -218,7 +218,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment()}")
 
     val topic4: String = "tsomec" // matches subscribed pattern
-    TestUtils.createTopic(this.zkClient, topic4, 2, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic4, 2, serverCount, this.servers)
     sendRecords(1000, new TopicPartition(topic4, 0))
     sendRecords(1000, new TopicPartition(topic4, 1))
 
@@ -242,7 +242,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     sendRecords(numRecords)
 
     val topic1: String = "tblablac" // matches subscribed pattern
-    TestUtils.createTopic(this.zkClient, topic1, 2, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic1, 2, serverCount, this.servers)
     sendRecords(1000, new TopicPartition(topic1, 0))
     sendRecords(1000, new TopicPartition(topic1, 1))
 
@@ -383,7 +383,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   @Test
   def testPartitionsFor() {
     val numParts = 2
-    TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
+    TestUtils.createTopic(this.zkUtils, "part-test", numParts, 1, this.servers)
     val parts = this.consumers(0).partitionsFor("part-test")
     assertNotNull(parts)
     assertEquals(2, parts.size)
@@ -396,9 +396,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     val topic1: String = "part-test-topic-1"
     val topic2: String = "part-test-topic-2"
     val topic3: String = "part-test-topic-3"
-    TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers)
-    TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers)
-    TestUtils.createTopic(this.zkClient, topic3, numParts, 1, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers)
+    TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers)
 
     val topics = this.consumers.head.listTopics()
     assertNotNull(topics)
@@ -475,7 +475,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
       this.consumers(0).assignment == subscriptions.asJava
     }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
 
-    TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
     this.consumers(0).subscribe(List(topic, otherTopic))
     TestUtils.waitUntilTrue(() => {
       this.consumers(0).poll(50)
@@ -486,7 +486,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   @Test
   def testShrinkingTopicSubscriptions() {
     val otherTopic = "other"
-    TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
+    TestUtils.createTopic(this.zkUtils, otherTopic, 2, serverCount, this.servers)
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
     val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
     this.consumers(0).subscribe(List(topic, otherTopic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 8080b08..77fcd8b 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -66,7 +66,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
       consumers += new KafkaConsumer(consumerConfig)
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkClient, ConsumerCoordinator.OffsetsTopicName,
+    TestUtils.createTopic(zkUtils, ConsumerCoordinator.OffsetsTopicName,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 2dbb9dc..29e146e 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -87,7 +87,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   @Test
   def testBrokerFailure() {
     val numPartitions = 3
-    val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers)
+    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers)
     assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))
 
     val scheduler = new ProducerScheduler()
@@ -107,7 +107,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
       assertTrue(scheduler.failed == false)
 
       // Make sure the leader still exists after bouncing brokers
-      (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition))
+      (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, partition))
     }
 
     scheduler.shutdown

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 87db255..0d401f7 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -79,7 +79,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
 
     try {
       // create topic
-      TestUtils.createTopic(zkClient, topic, 1, 1, List(server))
+      TestUtils.createTopic(zkUtils, topic, 1, 1, List(server))
       val partition = 0
 
       // prepare the messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index e90818a..8ba7fad 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -84,7 +84,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testTooLargeRecordWithAckZero() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
     // send a too-large record
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@@ -97,7 +97,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testTooLargeRecordWithAckOne() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
     // send a too-large record
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@@ -131,7 +131,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testWrongBrokerList() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
     // producer with incorrect broker list
     producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
@@ -149,7 +149,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testInvalidPartition() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
     // create a record with incorrect partition id, send should fail
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes, "value".getBytes)
@@ -170,7 +170,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testSendAfterClosed() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
 
@@ -209,7 +209,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     val topicProps = new Properties()
     topicProps.put("min.insync.replicas",(numServers+1).toString)
 
-    TestUtils.createTopic(zkClient, topicName, 1, numServers, servers, topicProps)
+    TestUtils.createTopic(zkUtils, topicName, 1, numServers, servers, topicProps)
 
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
     try {
@@ -229,7 +229,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     val topicProps = new Properties()
     topicProps.put("min.insync.replicas",numServers.toString)
 
-    TestUtils.createTopic(zkClient, topicName, 1, numServers, servers,topicProps)
+    TestUtils.createTopic(zkUtils, topicName, 1, numServers, servers,topicProps)
 
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
     // this should work with all brokers up and running

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 637d6f3..3aef172 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -91,7 +91,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
 
     try {
       // create topic
-      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
       // send a normal record
       val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
@@ -194,7 +194,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
 
     try {
       // create topic
-      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
       // non-blocking send a list of records
       val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
@@ -230,7 +230,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
 
     try {
       // create topic
-      val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
       val partition = 1
 
       // make sure leaders exist
@@ -289,7 +289,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
       assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
 
       // double check that the topic is created with leader elected
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
 
     } finally {
       if (producer != null) {
@@ -306,7 +306,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
   def testFlush() {
     var producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue)
     try {
-      TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes)
       for(i <- 0 until 50) {
         val responses = (0 until numRecords) map (i => producer.send(record))
@@ -328,7 +328,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
     var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
     try {
       // create topic
-      val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
       val leader0 = leaders(0)
       val leader1 = leaders(1)
 
@@ -372,7 +372,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
     var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
     try {
       // create topic
-      val leaders = TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+      val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
       val leader = leaders(0)
 
       // create record

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index 38b3dbd..bdf7e49 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -85,7 +85,7 @@ class QuotasTest extends KafkaServerTestHarness {
     producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
 
     val numPartitions = 1
-    val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers)
+    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers)
     leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1)
     followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1)
     assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
index 2f72c78..5dc4cbc 100644
--- a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
@@ -97,14 +97,14 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
 
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkClient, ConsumerCoordinator.OffsetsTopicName,
+    TestUtils.createTopic(zkUtils, ConsumerCoordinator.OffsetsTopicName,
       overridingProps.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
       overridingProps.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,
       servers(0).consumerCoordinator.offsetsTopicConfigs)
 
     // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(this.zkClient, topic, 1, numServers, this.servers)
+    TestUtils.createTopic(zkUtils, topic, 1, numServers, this.servers)
   }
 
   @After
@@ -193,7 +193,7 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
   @Test
   def testPartitionsFor() {
     val numParts = 2
-    TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
+    TestUtils.createTopic(zkUtils, "part-test", numParts, 1, this.servers)
     val parts = this.consumers(0).partitionsFor("part-test")
     assertNotNull(parts)
     assertEquals(2, parts.length)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
index 0f70624..c22e57a 100644
--- a/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
@@ -95,7 +95,7 @@ class SSLProducerSendTest extends KafkaServerTestHarness {
 
     try {
       // create topic
-      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
       // send a normal record
       val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
@@ -159,7 +159,7 @@ class SSLProducerSendTest extends KafkaServerTestHarness {
     var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile))
     try {
       // create topic
-      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
 
       // non-blocking send a list of records
       val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
@@ -194,7 +194,7 @@ class SSLProducerSendTest extends KafkaServerTestHarness {
     var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile))
     try {
       // create topic
-      val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
       val partition = 1
 
       // make sure leaders exist

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/other/kafka/DeleteZKPath.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala
index fb8ab9f..92bde88 100755
--- a/core/src/test/scala/other/kafka/DeleteZKPath.scala
+++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala
@@ -32,10 +32,10 @@ object DeleteZKPath {
     val config = new ConsumerConfig(Utils.loadProps(args(0)))
     val zkPath = args(1)
 
-    val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
+    val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
 
     try {
-      ZkUtils.deletePathRecursive(zkClient, zkPath);
+      zkUtils.deletePathRecursive(zkPath);
       System.out.println(zkPath + " is deleted")
     } catch {
       case e: Exception => System.err.println("Path not deleted " + e.printStackTrace())