You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/19 00:24:10 UTC

[5/5] kafka git commit: KAFKA-2639: Refactoring of ZkUtils

KAFKA-2639: Refactoring of ZkUtils

I've split the work of KAFKA-1695 because this refactoring touches a large number of files. Most of the changes are trivial, but I feel it will be easier to review this way.

This pull request includes the one Parth-Brahmbhatt started to address KAFKA-1695.

Author: flavio junqueira <fp...@apache.org>
Author: Flavio Junqueira <fp...@apache.org>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #303 from fpj/KAFKA-2639


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ce306ba4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ce306ba4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ce306ba4

Branch: refs/heads/trunk
Commit: ce306ba4ebc77464bf8ff4d656e1f1f44979182e
Parents: 78a2e2f
Author: flavio junqueira <fp...@apache.org>
Authored: Sun Oct 18 15:23:52 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Oct 18 15:23:52 2015 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 .../apache/kafka/common/security/JaasUtils.java |  64 +++
 .../src/main/scala/kafka/admin/AdminUtils.scala | 129 +++---
 .../main/scala/kafka/admin/ConfigCommand.scala  |  26 +-
 .../kafka/admin/ConsumerGroupCommand.scala      |  74 ++--
 .../PreferredReplicaLeaderElectionCommand.scala |  27 +-
 .../kafka/admin/ReassignPartitionsCommand.scala |  67 ++--
 .../main/scala/kafka/admin/TopicCommand.scala   |  65 +--
 .../main/scala/kafka/client/ClientUtils.scala   |  12 +-
 .../main/scala/kafka/cluster/Partition.scala    |   6 +-
 .../ZkNodeChangeNotificationListener.scala      |  14 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |   6 +-
 .../kafka/consumer/PartitionAssignor.scala      |  10 +-
 .../main/scala/kafka/consumer/TopicCount.scala  |  14 +-
 .../consumer/ZookeeperConsumerConnector.scala   |  58 +--
 .../consumer/ZookeeperTopicEventWatcher.scala   |  20 +-
 .../kafka/controller/KafkaController.scala      |  89 ++---
 .../controller/PartitionLeaderSelector.scala    |   2 +-
 .../controller/PartitionStateMachine.scala      |  39 +-
 .../kafka/controller/ReplicaStateMachine.scala  |  12 +-
 .../kafka/controller/TopicDeletionManager.scala |  12 +-
 .../kafka/coordinator/ConsumerCoordinator.scala |  16 +-
 .../kafka/coordinator/CoordinatorMetadata.scala |  10 +-
 .../security/auth/SimpleAclAuthorizer.scala     |  42 +-
 .../kafka/server/DynamicConfigManager.scala     |  16 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  10 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   |   7 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  58 +--
 .../main/scala/kafka/server/OffsetManager.scala |   4 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   2 +-
 .../scala/kafka/server/ReplicaManager.scala     |   4 +-
 .../kafka/server/ZookeeperLeaderElector.scala   |  12 +-
 .../kafka/tools/ConsumerOffsetChecker.scala     |  40 +-
 .../scala/kafka/tools/ExportZkOffsets.scala     |  24 +-
 .../scala/kafka/tools/ImportZkOffsets.scala     |   9 +-
 .../scala/kafka/tools/UpdateOffsetsInZK.scala   |  19 +-
 .../kafka/tools/VerifyConsumerRebalance.scala   |  25 +-
 .../scala/kafka/utils/ReplicationUtils.scala    |  25 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 399 +++++++++++--------
 .../kafka/api/ConsumerBounceTest.scala          |   2 +-
 .../integration/kafka/api/ConsumerTest.scala    |  26 +-
 .../kafka/api/IntegrationTestHarness.scala      |   2 +-
 .../kafka/api/ProducerBounceTest.scala          |   4 +-
 .../kafka/api/ProducerCompressionTest.scala     |   2 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |  14 +-
 .../kafka/api/ProducerSendTest.scala            |  14 +-
 .../integration/kafka/api/QuotasTest.scala      |   2 +-
 .../integration/kafka/api/SSLConsumerTest.scala |   6 +-
 .../kafka/api/SSLProducerSendTest.scala         |   6 +-
 .../test/scala/other/kafka/DeleteZKPath.scala   |   4 +-
 .../scala/other/kafka/TestOffsetManager.scala   |  24 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  34 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala | 107 +++--
 .../kafka/admin/DeleteConsumerGroupTest.scala   |  48 +--
 .../unit/kafka/admin/DeleteTopicTest.scala      |  85 ++--
 .../unit/kafka/admin/TopicCommandTest.scala     |  36 +-
 .../ZkNodeChangeNotificationListenerTest.scala  |   6 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   |   2 +-
 .../kafka/consumer/PartitionAssignorTest.scala  |  23 +-
 .../ZookeeperConsumerConnectorTest.scala        |  38 +-
 .../controller/ControllerFailoverTest.scala     |   2 +-
 .../coordinator/CoordinatorMetadataTest.scala   |  52 +--
 .../kafka/integration/AutoOffsetResetTest.scala |   2 +-
 .../integration/BaseTopicMetadataTest.scala     |  14 +-
 .../unit/kafka/integration/FetcherTest.scala    |   4 +-
 .../kafka/integration/PrimitiveApiTest.scala    |   8 +-
 .../kafka/integration/RollingBounceTest.scala   |  10 +-
 .../integration/UncleanLeaderElectionTest.scala |  22 +-
 .../ZookeeperConsumerConnectorTest.scala        |   2 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala  |   8 +-
 .../unit/kafka/producer/ProducerTest.scala      |  18 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |  18 +-
 .../security/auth/SimpleAclAuthorizerTest.scala |   6 +-
 .../unit/kafka/server/AdvertiseBrokerTest.scala |   2 +-
 .../kafka/server/BaseReplicaFetchTest.scala     |   2 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  10 +-
 .../server/HighwatermarkPersistenceTest.scala   |  18 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  18 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   8 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |  14 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |  16 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  11 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |   2 +-
 .../unit/kafka/server/ServerStartupTest.scala   |   6 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   6 +-
 .../unit/kafka/utils/ReplicationUtilsTest.scala |  16 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  57 +--
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |  26 +-
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   |  48 +--
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  10 +-
 91 files changed, 1283 insertions(+), 1109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 247f556..289f1d0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -31,6 +31,7 @@
   <allow pkg="org.powermock" />
 
   <allow pkg="javax.net.ssl" />
+  <allow pkg="javax.security.auth" />
 
   <!-- no one depends on the server -->
   <disallow pkg="kafka" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
new file mode 100644
index 0000000..ce0be62
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security;
+
+import java.io.File;
+import java.net.URI;
+import java.security.URIParameter;
+import javax.security.auth.login.Configuration;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JaasUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(JaasUtils.class);
+    public static final String LOGIN_CONTEXT_SERVER = "KafkaServer";
+    public static final String LOGIN_CONTEXT_CLIENT = "KafkaClient";
+    public static final String SERVICE_NAME = "serviceName";
+    public static final String JAVA_LOGIN_CONFIG_PARAM = "java.security.auth.login.config";
+    public static final String ZK_SASL_CLIENT = "zookeeper.sasl.client";
+    public static final String ZK_LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig";
+
+    public static boolean isZkSecurityEnabled(String loginConfigFile) {
+        boolean isSecurityEnabled = false;
+        boolean zkSaslEnabled = Boolean.getBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
+        String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "Client");
+
+        if (loginConfigFile != null && loginConfigFile.length() > 0) {
+            File configFile = new File(loginConfigFile);
+            if (!configFile.canRead()) {
+                throw new KafkaException("File " + loginConfigFile + "cannot be read.");
+            }
+            try {
+                URI configUri = configFile.toURI();
+                Configuration loginConf = Configuration.getInstance("JavaLoginConfig", new URIParameter(configUri));
+                isSecurityEnabled = loginConf.getAppConfigurationEntry(zkLoginContextName) != null;
+            } catch (Exception e) {
+                throw new KafkaException(e);
+            }
+            if (isSecurityEnabled && !zkSaslEnabled) {
+                LOG.error("JAAS file is present, but system property " + 
+                            ZK_SASL_CLIENT + " is set to false, which disables " +
+                            "SASL in the ZooKeeper client");
+                throw new KafkaException("Exception while determining if ZooKeeper is secure");
+            }
+        }
+
+        return isSecurityEnabled;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 9966660..ecc5b9d 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -23,6 +23,7 @@ import kafka.cluster.{BrokerEndPoint, Broker}
 import kafka.log.LogConfig
 import kafka.server.ConfigType
 import kafka.utils._
+import kafka.utils.ZkUtils._
 import kafka.api.{TopicMetadata, PartitionMetadata}
 
 import java.util.Random
@@ -103,12 +104,12 @@ object AdminUtils extends Logging {
   * @param replicaAssignmentStr Manual replica assignment
   * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
   */
-  def addPartitions(zkClient: ZkClient,
+  def addPartitions(zkUtils: ZkUtils,
                     topic: String,
                     numPartitions: Int = 1,
                     replicaAssignmentStr: String = "",
                     checkBrokerAvailable: Boolean = true) {
-    val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+    val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
     if (existingPartitionsReplicaList.size == 0)
       throw new AdminOperationException("The topic %s does not exist".format(topic))
 
@@ -118,7 +119,7 @@ object AdminUtils extends Logging {
       throw new AdminOperationException("The number of partitions for a topic can only be increased")
 
     // create the new partition replication list
-    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+    val brokerList = zkUtils.getSortedBrokerList()
     val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "")
       AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
     else
@@ -134,7 +135,7 @@ object AdminUtils extends Logging {
     val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
     // add the new list
     partitionReplicaList ++= newPartitionReplicaList
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList, update = true)
   }
 
   def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
@@ -159,9 +160,9 @@ object AdminUtils extends Logging {
     ret.toMap
   }
   
-  def deleteTopic(zkClient: ZkClient, topic: String) {
+  def deleteTopic(zkUtils: ZkUtils, topic: String) {
     try {
-      ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
+      zkUtils.createPersistentPath(getDeleteTopicPath(topic))
     } catch {
       case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
         "topic %s is already marked for deletion".format(topic))
@@ -169,8 +170,8 @@ object AdminUtils extends Logging {
     }
   }
   
-  def isConsumerGroupActive(zkClient: ZkClient, group: String) = {
-    ZkUtils.getConsumersInGroup(zkClient, group).nonEmpty
+  def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
+    zkUtils.getConsumersInGroup(group).nonEmpty
   }
 
   /**
@@ -180,10 +181,10 @@ object AdminUtils extends Logging {
    * @param group Consumer group
    * @return whether or not we deleted the consumer group information
    */
-  def deleteConsumerGroupInZK(zkClient: ZkClient, group: String) = {
-    if (!isConsumerGroupActive(zkClient, group)) {
+  def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = {
+    if (!isConsumerGroupActive(zkUtils, group)) {
       val dir = new ZKGroupDirs(group)
-      ZkUtils.deletePathRecursive(zkClient, dir.consumerGroupDir)
+      zkUtils.deletePathRecursive(dir.consumerGroupDir)
       true
     }
     else false
@@ -198,15 +199,15 @@ object AdminUtils extends Logging {
    * @param topic Topic of the consumer group information we wish to delete
    * @return whether or not we deleted the consumer group information for the given topic
    */
-  def deleteConsumerGroupInfoForTopicInZK(zkClient: ZkClient, group: String, topic: String) = {
-    val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
+  def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = {
+    val topics = zkUtils.getTopicsByConsumerGroup(group)
     if (topics == Seq(topic)) {
-      deleteConsumerGroupInZK(zkClient, group)
+      deleteConsumerGroupInZK(zkUtils, group)
     }
-    else if (!isConsumerGroupActive(zkClient, group)) {
+    else if (!isConsumerGroupActive(zkUtils, group)) {
       val dir = new ZKGroupTopicDirs(group, topic)
-      ZkUtils.deletePathRecursive(zkClient, dir.consumerOwnerDir)
-      ZkUtils.deletePathRecursive(zkClient, dir.consumerOffsetDir)
+      zkUtils.deletePathRecursive(dir.consumerOwnerDir)
+      zkUtils.deletePathRecursive(dir.consumerOffsetDir)
       true
     }
     else false
@@ -218,25 +219,25 @@ object AdminUtils extends Logging {
    * @param zkClient Zookeeper client
    * @param topic Topic of the consumer group information we wish to delete
    */
-  def deleteAllConsumerGroupInfoForTopicInZK(zkClient: ZkClient, topic: String) {
-    val groups = ZkUtils.getAllConsumerGroupsForTopic(zkClient, topic)
-    groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
+  def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
+    val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
+    groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
   }
 
-  def topicExists(zkClient: ZkClient, topic: String): Boolean = 
-    zkClient.exists(ZkUtils.getTopicPath(topic))
+  def topicExists(zkUtils: ZkUtils, topic: String): Boolean = 
+    zkUtils.zkClient.exists(getTopicPath(topic))
     
-  def createTopic(zkClient: ZkClient,
+  def createTopic(zkUtils: ZkUtils,
                   topic: String,
                   partitions: Int, 
                   replicationFactor: Int, 
                   topicConfig: Properties = new Properties) {
-    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+    val brokerList = zkUtils.getSortedBrokerList()
     val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
   }
 
-  def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient,
+  def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
                                                      topic: String,
                                                      partitionReplicaAssignment: Map[Int, Seq[Int]],
                                                      config: Properties = new Properties,
@@ -245,13 +246,13 @@ object AdminUtils extends Logging {
     Topic.validate(topic)
     require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
 
-    val topicPath = ZkUtils.getTopicPath(topic)
+    val topicPath = getTopicPath(topic)
 
     if (!update) {
-      if (zkClient.exists(topicPath))
+      if (zkUtils.zkClient.exists(topicPath))
         throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
       else if (Topic.hasCollisionChars(topic)) {
-        val allTopics = ZkUtils.getAllTopics(zkClient)
+        val allTopics = zkUtils.getAllTopics()
         val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, t))
         if (collidingTopics.nonEmpty) {
           throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", ")))
@@ -265,24 +266,24 @@ object AdminUtils extends Logging {
     if (!update) {
       // write out the config if there is any, this isn't transactional with the partition assignments
       LogConfig.validate(config)
-      writeEntityConfig(zkClient, ConfigType.Topic, topic, config)
+      writeEntityConfig(zkUtils, ConfigType.Topic, topic, config)
     }
 
     // create the partition assignment
-    writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
+    writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
   }
 
-  private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
+  private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
     try {
-      val zkPath = ZkUtils.getTopicPath(topic)
-      val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
+      val zkPath = getTopicPath(topic)
+      val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
 
       if (!update) {
         info("Topic creation " + jsonPartitionData.toString)
-        ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData)
+        zkUtils.createPersistentPath(zkPath, jsonPartitionData)
       } else {
         info("Topic update " + jsonPartitionData.toString)
-        ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData)
+        zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
       }
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
     } catch {
@@ -299,8 +300,8 @@ object AdminUtils extends Logging {
    *                 existing configs need to be deleted, it should be done prior to invoking this API
    *
    */
-  def changeClientIdConfig(zkClient: ZkClient, clientId: String, configs: Properties) {
-    changeEntityConfig(zkClient, ConfigType.Client, clientId, configs)
+  def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties) {
+    changeEntityConfig(zkUtils, ConfigType.Client, clientId, configs)
   }
 
   /**
@@ -311,22 +312,22 @@ object AdminUtils extends Logging {
    *                 existing configs need to be deleted, it should be done prior to invoking this API
    *
    */
-  def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) {
-    if(!topicExists(zkClient, topic))
+  def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
+    if(!topicExists(zkUtils, topic))
       throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
     // remove the topic overrides
     LogConfig.validate(configs)
-    changeEntityConfig(zkClient, ConfigType.Topic, topic, configs)
+    changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
   }
 
-  private def changeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, configs: Properties) {
+  private def changeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties) {
     // write the new config--may not exist if there were previously no overrides
-    writeEntityConfig(zkClient, entityType, entityName, configs)
+    writeEntityConfig(zkUtils, entityType, entityName, configs)
 
     // create the change notification
     val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
     val content = Json.encode(getConfigChangeZnodeData(entityType, entityName))
-    zkClient.createPersistentSequential(seqNode, content)
+    zkUtils.zkClient.createPersistentSequential(seqNode, content)
   }
 
   def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = {
@@ -336,20 +337,20 @@ object AdminUtils extends Logging {
   /**
    * Write out the topic config to zk, if there is any
    */
-  private def writeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, config: Properties) {
+  private def writeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, config: Properties) {
     val configMap: mutable.Map[String, String] = {
       import JavaConversions._
       config
     }
     val map = Map("version" -> 1, "config" -> configMap)
-    ZkUtils.updatePersistentPath(zkClient, ZkUtils.getEntityConfigPath(entityType, entityName), Json.encode(map))
+    zkUtils.updatePersistentPath(getEntityConfigPath(entityType, entityName), Json.encode(map))
   }
   
   /**
    * Read the entity (topic or client) config (if any) from zk
    */
-  def fetchEntityConfig(zkClient: ZkClient, entityType: String, entity: String): Properties = {
-    val str: String = zkClient.readData(ZkUtils.getEntityConfigPath(entityType, entity), true)
+  def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entity: String): Properties = {
+    val str: String = zkUtils.zkClient.readData(getEntityConfigPath(entityType, entity), true)
     val props = new Properties()
     if(str != null) {
       Json.parseFull(str) match {
@@ -375,28 +376,28 @@ object AdminUtils extends Logging {
     props
   }
 
-  def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
-    ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchEntityConfig(zkClient, ConfigType.Topic, topic))).toMap
+  def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
+    zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
 
-  def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
-    fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
+  def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): TopicMetadata =
+    fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
 
-  def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
+  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[TopicMetadata] = {
     val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
-    topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
+    topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo))
   }
 
 
 
-  private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
-    if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
-      val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
+  private def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
+    if(zkUtils.pathExists(getTopicPath(topic))) {
+      val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get
       val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
       val partitionMetadata = sortedPartitions.map { partitionMap =>
         val partition = partitionMap._1
         val replicas = partitionMap._2
-        val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
-        val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+        val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partition)
+        val leader = zkUtils.getLeaderForPartition(topic, partition)
         debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
         var leaderInfo: Option[BrokerEndPoint] = None
@@ -406,15 +407,15 @@ object AdminUtils extends Logging {
           leaderInfo = leader match {
             case Some(l) =>
               try {
-                Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head.getBrokerEndPoint(protocol))
+                Some(getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getBrokerEndPoint(protocol))
               } catch {
                 case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
               }
             case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
           }
           try {
-            replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas).map(_.getBrokerEndPoint(protocol))
-            isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas).map(_.getBrokerEndPoint(protocol))
+            replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getBrokerEndPoint(protocol))
+            isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getBrokerEndPoint(protocol))
           } catch {
             case e: Throwable => throw new ReplicaNotAvailableException(e)
           }
@@ -439,7 +440,7 @@ object AdminUtils extends Logging {
     }
   }
 
-  private def getBrokerInfoFromCache(zkClient: ZkClient,
+  private def getBrokerInfoFromCache(zkUtils: ZkUtils,
                                      cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
                                      brokerIds: Seq[Int]): Seq[Broker] = {
     var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
@@ -448,7 +449,7 @@ object AdminUtils extends Logging {
       optionalBrokerInfo match {
         case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
         case None => // fetch it from zookeeper
-          ZkUtils.getBrokerInfo(zkClient, id) match {
+          zkUtils.getBrokerInfo(id) match {
             case Some(brokerInfo) =>
               cachedBrokerInfo += (id -> brokerInfo)
               Some(brokerInfo)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index f0217de..ba4c003 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -26,6 +26,7 @@ import org.I0Itec.zkclient.ZkClient
 import scala.collection._
 import scala.collection.JavaConversions._
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
 
 
 /**
@@ -42,52 +43,55 @@ object ConfigCommand {
 
     opts.checkArgs()
 
-    val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
+    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
+                          30000,
+                          30000,
+                          JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
 
     try {
       if (opts.options.has(opts.alterOpt))
-        alterConfig(zkClient, opts)
+        alterConfig(zkUtils, opts)
       else if (opts.options.has(opts.describeOpt))
-        describeConfig(zkClient, opts)
+        describeConfig(zkUtils, opts)
     } catch {
       case e: Throwable =>
         println("Error while executing topic command " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
-      zkClient.close()
+      zkUtils.close()
     }
   }
 
-  private def alterConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
+  private def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
     val configsToBeAdded = parseConfigsToBeAdded(opts)
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
     val entityType = opts.options.valueOf(opts.entityType)
     val entityName = opts.options.valueOf(opts.entityName)
 
     // compile the final set of configs
-    val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
+    val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
     configs.putAll(configsToBeAdded)
     configsToBeDeleted.foreach(config => configs.remove(config))
 
     if (entityType.equals(ConfigType.Topic)) {
-      AdminUtils.changeTopicConfig(zkClient, entityName, configs)
+      AdminUtils.changeTopicConfig(zkUtils, entityName, configs)
       println("Updated config for topic: \"%s\".".format(entityName))
     } else {
-      AdminUtils.changeClientIdConfig(zkClient, entityName, configs)
+      AdminUtils.changeClientIdConfig(zkUtils, entityName, configs)
       println("Updated config for clientId: \"%s\".".format(entityName))
     }
   }
 
-  private def describeConfig(zkClient: ZkClient, opts: ConfigCommandOptions) {
+  private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
     val entityType = opts.options.valueOf(opts.entityType)
     val entityNames: Seq[String] =
       if (opts.options.has(opts.entityName))
         Seq(opts.options.valueOf(opts.entityName))
       else
-        ZkUtils.getAllEntitiesWithConfig(zkClient, entityType)
+        zkUtils.getAllEntitiesWithConfig(entityType)
 
     for (entityName <- entityNames) {
-      val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName)
+      val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
       println("Configs for %s:%s are %s"
                       .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index f23120e..8efbb2a 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -31,6 +31,7 @@ import scala.collection.{Set, mutable}
 import kafka.consumer.SimpleConsumer
 import collection.JavaConversions._
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
 
 
 object ConsumerGroupCommand {
@@ -48,57 +49,60 @@ object ConsumerGroupCommand {
 
     opts.checkArgs()
 
-    val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
+    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), 
+                          30000,
+                          30000,
+                          JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
 
     try {
       if (opts.options.has(opts.listOpt))
-        list(zkClient)
+        list(zkUtils)
       else if (opts.options.has(opts.describeOpt))
-        describe(zkClient, opts)
+        describe(zkUtils, opts)
       else if (opts.options.has(opts.deleteOpt))
-        delete(zkClient, opts)
+        delete(zkUtils, opts)
     } catch {
       case e: Throwable =>
         println("Error while executing consumer group command " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
-      zkClient.close()
+      zkUtils.close()
     }
   }
 
-  def list(zkClient: ZkClient) {
-    ZkUtils.getConsumerGroups(zkClient).foreach(println)
+  def list(zkUtils: ZkUtils) {
+    zkUtils.getConsumerGroups().foreach(println)
   }
 
-  def describe(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+  def describe(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
     val configs = parseConfigs(opts)
     val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
     val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
     val group = opts.options.valueOf(opts.groupOpt)
-    val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group)
+    val topics = zkUtils.getTopicsByConsumerGroup(group)
     if (topics.isEmpty) {
       println("No topic available for consumer group provided")
     }
-    topics.foreach(topic => describeTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
+    topics.foreach(topic => describeTopic(zkUtils, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs))
   }
 
-  def delete(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+  def delete(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
     if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) {
-      deleteForTopic(zkClient, opts)
+      deleteForTopic(zkUtils, opts)
     }
     else if (opts.options.has(opts.groupOpt)) {
-      deleteForGroup(zkClient, opts)
+      deleteForGroup(zkUtils, opts)
     }
     else if (opts.options.has(opts.topicOpt)) {
-      deleteAllForTopic(zkClient, opts)
+      deleteAllForTopic(zkUtils, opts)
     }
   }
 
-  private def deleteForGroup(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+  private def deleteForGroup(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
     val groups = opts.options.valuesOf(opts.groupOpt)
     groups.foreach { group =>
       try {
-        if (AdminUtils.deleteConsumerGroupInZK(zkClient, group))
+        if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
           println("Deleted all consumer group information for group %s in zookeeper.".format(group))
         else
           println("Delete for group %s failed because its consumers are still active.".format(group))
@@ -110,13 +114,13 @@ object ConsumerGroupCommand {
     }
   }
 
-  private def deleteForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+  private def deleteForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
     val groups = opts.options.valuesOf(opts.groupOpt)
     val topic = opts.options.valueOf(opts.topicOpt)
     Topic.validate(topic)
     groups.foreach { group =>
       try {
-        if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic))
+        if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
           println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic))
         else
           println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic))
@@ -128,10 +132,10 @@ object ConsumerGroupCommand {
     }
   }
 
-  private def deleteAllForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) {
+  private def deleteAllForTopic(zkUtils: ZkUtils, opts: ConsumerGroupCommandOptions) {
     val topic = opts.options.valueOf(opts.topicOpt)
     Topic.validate(topic)
-    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)
+    AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
     println("Deleted consumer group information for all inactive consumer groups for topic %s in zookeeper.".format(topic))
   }
 
@@ -144,35 +148,35 @@ object ConsumerGroupCommand {
     props
   }
 
-  private def describeTopic(zkClient: ZkClient,
+  private def describeTopic(zkUtils: ZkUtils,
                             group: String,
                             topic: String,
                             channelSocketTimeoutMs: Int,
                             channelRetryBackoffMs: Int) {
-    val topicPartitions = getTopicPartitions(zkClient, topic)
-    val partitionOffsets = getPartitionOffsets(zkClient, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
+    val topicPartitions = getTopicPartitions(zkUtils, topic)
+    val partitionOffsets = getPartitionOffsets(zkUtils, group, topicPartitions, channelSocketTimeoutMs, channelRetryBackoffMs)
     println("%s, %s, %s, %s, %s, %s, %s"
       .format("GROUP", "TOPIC", "PARTITION", "CURRENT OFFSET", "LOG END OFFSET", "LAG", "OWNER"))
     topicPartitions
       .sortBy { case topicPartition => topicPartition.partition }
       .foreach { topicPartition =>
-      describePartition(zkClient, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition))
+      describePartition(zkUtils, group, topicPartition.topic, topicPartition.partition, partitionOffsets.get(topicPartition))
     }
   }
 
-  private def getTopicPartitions(zkClient: ZkClient, topic: String) = {
-    val topicPartitionMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic))
+  private def getTopicPartitions(zkUtils: ZkUtils, topic: String) = {
+    val topicPartitionMap = zkUtils.getPartitionsForTopics(Seq(topic))
     val partitions = topicPartitionMap.getOrElse(topic, Seq.empty)
     partitions.map(TopicAndPartition(topic, _))
   }
 
-  private def getPartitionOffsets(zkClient: ZkClient,
+  private def getPartitionOffsets(zkUtils: ZkUtils,
                                   group: String,
                                   topicPartitions: Seq[TopicAndPartition],
                                   channelSocketTimeoutMs: Int,
                                   channelRetryBackoffMs: Int): Map[TopicAndPartition, Long] = {
     val offsetMap = mutable.Map[TopicAndPartition, Long]()
-    val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)
+    val channel = ClientUtils.channelToOffsetManager(group, zkUtils, channelSocketTimeoutMs, channelRetryBackoffMs)
     channel.send(OffsetFetchRequest(group, topicPartitions))
     val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload())
 
@@ -182,7 +186,7 @@ object ConsumerGroupCommand {
         // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
         // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
         try {
-          val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
+          val offset = zkUtils.readData(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1.toLong
           offsetMap.put(topicAndPartition, offset)
         } catch {
           case z: ZkNoNodeException =>
@@ -200,20 +204,20 @@ object ConsumerGroupCommand {
     offsetMap.toMap
   }
 
-  private def describePartition(zkClient: ZkClient,
+  private def describePartition(zkUtils: ZkUtils,
                                 group: String,
                                 topic: String,
                                 partition: Int,
                                 offsetOpt: Option[Long]) {
     val topicAndPartition = TopicAndPartition(topic, partition)
     val groupDirs = new ZKGroupTopicDirs(group, topic)
-    val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/" + partition)._1
-    ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
+    val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/" + partition)._1
+    zkUtils.getLeaderForPartition(topic, partition) match {
       case Some(-1) =>
         println("%s, %s, %s, %s, %s, %s, %s"
           .format(group, topic, partition, offsetOpt.getOrElse("unknown"), "unknown", "unknown", owner.getOrElse("none")))
       case Some(brokerId) =>
-        val consumerOpt = getConsumer(zkClient, brokerId)
+        val consumerOpt = getConsumer(zkUtils, brokerId)
         consumerOpt match {
           case Some(consumer) =>
             val request =
@@ -231,9 +235,9 @@ object ConsumerGroupCommand {
     }
   }
 
-  private def getConsumer(zkClient: ZkClient, brokerId: Int): Option[SimpleConsumer] = {
+  private def getConsumer(zkUtils: ZkUtils, brokerId: Int): Option[SimpleConsumer] = {
     try {
-      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
+      zkUtils.readDataMaybeNull(ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
         case Some(brokerInfoString) =>
           Json.parseFull(brokerInfoString) match {
             case Some(m) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 2aa6e62..e74fcb6 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{TopicAndPartition, AdminCommandFailedException}
 import collection._
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
 
 object PreferredReplicaLeaderElectionCommand extends Logging {
 
@@ -51,15 +52,19 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
 
     val zkConnect = options.valueOf(zkConnectOpt)
     var zkClient: ZkClient = null
-
+    var zkUtils: ZkUtils = null
     try {
       zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
+      zkUtils = ZkUtils(zkConnect, 
+                        30000,
+                        30000,
+                        JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
       val partitionsForPreferredReplicaElection =
         if (!options.has(jsonFileOpt))
-          ZkUtils.getAllPartitions(zkClient)
+          zkUtils.getAllPartitions()
         else
           parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
-      val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
+      val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkUtils, partitionsForPreferredReplicaElection)
 
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
       println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
@@ -95,18 +100,18 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     }
   }
 
-  def writePreferredReplicaElectionData(zkClient: ZkClient,
+  def writePreferredReplicaElectionData(zkUtils: ZkUtils,
                                         partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
     val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
     val partitionsList = partitionsUndergoingPreferredReplicaElection.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
     val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
     try {
-      ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+      zkUtils.createPersistentPath(zkPath, jsonData)
       info("Created preferred replica election path with %s".format(jsonData))
     } catch {
       case nee: ZkNodeExistsException =>
         val partitionsUndergoingPreferredReplicaElection =
-          PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
+          PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(zkUtils.readData(zkPath)._1)
         throw new AdminOperationException("Preferred replica leader election currently in progress for " +
           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
       case e2: Throwable => throw new AdminOperationException(e2.toString)
@@ -114,20 +119,20 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
   }
 }
 
-class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scala.collection.Set[TopicAndPartition])
+class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitions: scala.collection.Set[TopicAndPartition])
   extends Logging {
   def moveLeaderToPreferredReplica() = {
     try {
-      val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition))
-      PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
+      val validPartitions = partitions.filter(p => validatePartition(zkUtils, p.topic, p.partition))
+      PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, validPartitions)
     } catch {
       case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e)
     }
   }
 
-  def validatePartition(zkClient: ZkClient, topic: String, partition: Int): Boolean = {
+  def validatePartition(zkUtils: ZkUtils, topic: String, partition: Int): Boolean = {
     // check if partition exists
-    val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic)
+    val partitionsOpt = zkUtils.getPartitionsForTopics(List(topic)).get(topic)
     partitionsOpt match {
       case Some(partitions) =>
         if(partitions.contains(partition)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index ea34589..10182f6 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.common.{TopicAndPartition, AdminCommandFailedException}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
 
 object ReassignPartitionsCommand extends Logging {
 
@@ -38,33 +39,37 @@ object ReassignPartitionsCommand extends Logging {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
 
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
-    var zkClient: ZkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000)
+    val zkUtils = ZkUtils(zkConnect, 
+                          30000,
+                          30000,
+                          JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
     try {
       if(opts.options.has(opts.verifyOpt))
-        verifyAssignment(zkClient, opts)
+        verifyAssignment(zkUtils, opts)
       else if(opts.options.has(opts.generateOpt))
-        generateAssignment(zkClient, opts)
+        generateAssignment(zkUtils, opts)
       else if (opts.options.has(opts.executeOpt))
-        executeAssignment(zkClient, opts)
+        executeAssignment(zkUtils, opts)
     } catch {
       case e: Throwable =>
         println("Partitions reassignment failed due to " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
+      val zkClient = zkUtils.zkClient
       if (zkClient != null)
         zkClient.close()
     }
   }
 
-  def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+  def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
     if(!opts.options.has(opts.reassignmentJsonFileOpt))
       CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option")
     val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val jsonString = Utils.readFileAsString(jsonFile)
-    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
+    val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentData(jsonString)
 
     println("Status of partition reassignment:")
-    val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
+    val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned)
     reassignedPartitionsStatus.foreach { partition =>
       partition._2 match {
         case ReassignmentCompleted =>
@@ -77,7 +82,7 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+  def generateAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
     if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt)))
       CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options")
     val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
@@ -86,11 +91,11 @@ object ReassignPartitionsCommand extends Logging {
     if (duplicateReassignments.nonEmpty)
       throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
-    val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+    val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString)
     val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
     if (duplicateTopicsToReassign.nonEmpty)
       throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
-    val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
+    val topicPartitionsToReassign = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
 
     var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
     val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
@@ -99,18 +104,18 @@ object ReassignPartitionsCommand extends Logging {
         topicInfo._2.head._2.size)
       partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
     }
-    val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
+    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic).toSeq)
     println("Current partition replica assignment\n\n%s"
-      .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
-    println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
+      .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+    println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
   }
 
-  def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) {
+  def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
     if(!opts.options.has(opts.reassignmentJsonFileOpt))
       CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option")
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
-    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
+    val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
     val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp})
@@ -125,28 +130,28 @@ object ReassignPartitionsCommand extends Logging {
         .mkString(". ")
       throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg))
     }
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)
     // before starting assignment, output the current replica assignment to facilitate rollback
-    val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic))
+    val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
-      .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
+      .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
     // start the reassignment
     if(reassignPartitionsCommand.reassignPartitions())
-      println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
+      println("Successfully started reassignment of partitions %s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
     else
       println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
   }
 
-  private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
+  private def checkIfReassignmentSucceeded(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
   :Map[TopicAndPartition, ReassignmentStatus] = {
-    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
+    val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned().mapValues(_.newReplicas)
     partitionsToBeReassigned.map { topicAndPartition =>
-      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1,
+      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkUtils,topicAndPartition._1,
         topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
     }
   }
 
-  def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition,
+  def checkIfPartitionReassignmentSucceeded(zkUtils: ZkUtils, topicAndPartition: TopicAndPartition,
                                             reassignedReplicas: Seq[Int],
                                             partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
                                             partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
@@ -155,7 +160,7 @@ object ReassignPartitionsCommand extends Logging {
       case Some(partition) => ReassignmentInProgress
       case None =>
         // check if the current replica assignment matches the expected one after reassignment
-        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition)
+        val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
         if(assignedReplicas == newReplicas)
           ReassignmentCompleted
         else {
@@ -203,31 +208,31 @@ object ReassignPartitionsCommand extends Logging {
   }
 }
 
-class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
+class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
   extends Logging {
   def reassignPartitions(): Boolean = {
     try {
-      val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition))
+      val validPartitions = partitions.filter(p => validatePartition(zkUtils, p._1.topic, p._1.partition))
       if(validPartitions.isEmpty) {
         false
       }
       else {
-        val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions)
-        ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
+        val jsonReassignmentData = zkUtils.getPartitionReassignmentZkData(validPartitions)
+        zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
         true
       }
     } catch {
       case ze: ZkNodeExistsException =>
-        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+        val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
         throw new AdminCommandFailedException("Partition reassignment currently in " +
         "progress for %s. Aborting operation".format(partitionsBeingReassigned))
       case e: Throwable => error("Admin command failed", e); false
     }
   }
 
-  def validatePartition(zkClient: ZkClient, topic: String, partition: Int): Boolean = {
+  def validatePartition(zkUtils: ZkUtils, topic: String, partition: Int): Boolean = {
     // check if partition exists
-    val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic)
+    val partitionsOpt = zkUtils.getPartitionsForTopics(List(topic)).get(topic)
     partitionsOpt match {
       case Some(partitions) =>
         if(partitions.contains(partition)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 3abac62..9fe2606 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 import kafka.common.{Topic, AdminCommandFailedException}
 import kafka.utils.CommandLineUtils
 import kafka.utils._
+import kafka.utils.ZkUtils._
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
@@ -30,6 +31,7 @@ import kafka.log.LogConfig
 import kafka.consumer.Whitelist
 import kafka.server.{ConfigType, OffsetManager}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.security.JaasUtils
 import kafka.coordinator.ConsumerCoordinator
 
 
@@ -49,33 +51,36 @@ object TopicCommand extends Logging {
 
     opts.checkArgs()
 
-    val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000)
+    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt), 
+                          30000,
+                          30000,
+                          JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
     var exitCode = 0
     try {
       if(opts.options.has(opts.createOpt))
-        createTopic(zkClient, opts)
+        createTopic(zkUtils, opts)
       else if(opts.options.has(opts.alterOpt))
-        alterTopic(zkClient, opts)
+        alterTopic(zkUtils, opts)
       else if(opts.options.has(opts.listOpt))
-        listTopics(zkClient, opts)
+        listTopics(zkUtils, opts)
       else if(opts.options.has(opts.describeOpt))
-        describeTopic(zkClient, opts)
+        describeTopic(zkUtils, opts)
       else if(opts.options.has(opts.deleteOpt))
-        deleteTopic(zkClient, opts)
+        deleteTopic(zkUtils, opts)
     } catch {
       case e: Throwable =>
         println("Error while executing topic command : " + e.getMessage)
         error(Utils.stackTrace(e))
         exitCode = 1
     } finally {
-      zkClient.close()
+      zkUtils.close()
       System.exit(exitCode)
     }
 
   }
 
-  private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = {
-    val allTopics = ZkUtils.getAllTopics(zkClient).sorted
+  private def getTopics(zkUtils: ZkUtils, opts: TopicCommandOptions): Seq[String] = {
+    val allTopics = zkUtils.getAllTopics().sorted
     if (opts.options.has(opts.topicOpt)) {
       val topicsSpec = opts.options.valueOf(opts.topicOpt)
       val topicsFilter = new Whitelist(topicsSpec)
@@ -84,31 +89,31 @@ object TopicCommand extends Logging {
       allTopics
   }
 
-  def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
+  def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
     val topic = opts.options.valueOf(opts.topicOpt)
     val configs = parseTopicConfigsToBeAdded(opts)
     if (Topic.hasCollisionChars(topic))
       println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
     if (opts.options.has(opts.replicaAssignmentOpt)) {
       val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs, update = false)
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
     } else {
       CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
       val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
       val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
-      AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
+      AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)
     }
     println("Created topic \"%s\".".format(topic))
   }
 
-  def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
-    val topics = getTopics(zkClient, opts)
+  def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+    val topics = getTopics(zkUtils, opts)
     if (topics.length == 0) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
           opts.options.valueOf(opts.zkConnectOpt)))
     }
     topics.foreach { topic =>
-      val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+      val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
       if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
         println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
         println("         Going forward, please use kafka-configs.sh for this functionality")
@@ -118,7 +123,7 @@ object TopicCommand extends Logging {
         // compile the final set of configs
         configs.putAll(configsToBeAdded)
         configsToBeDeleted.foreach(config => configs.remove(config))
-        AdminUtils.changeTopicConfig(zkClient, topic, configs)
+        AdminUtils.changeTopicConfig(zkUtils, topic, configs)
         println("Updated config for topic \"%s\".".format(topic))
       }
 
@@ -130,16 +135,16 @@ object TopicCommand extends Logging {
           "logic or ordering of the messages will be affected")
         val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
-        AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+        AdminUtils.addPartitions(zkUtils, topic, nPartitions, replicaAssignmentStr)
         println("Adding partitions succeeded!")
       }
     }
   }
 
-  def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
-    val topics = getTopics(zkClient, opts)
+  def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+    val topics = getTopics(zkUtils, opts)
     for(topic <- topics) {
-      if (ZkUtils.pathExists(zkClient,ZkUtils.getDeleteTopicPath(topic))) {
+      if (zkUtils.pathExists(getDeleteTopicPath(topic))) {
         println("%s - marked for deletion".format(topic))
       } else {
         println(topic)
@@ -147,8 +152,8 @@ object TopicCommand extends Logging {
     }
   }
 
-  def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
-    val topics = getTopics(zkClient, opts)
+  def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+    val topics = getTopics(zkUtils, opts)
     if (topics.length == 0) {
       throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
           opts.options.valueOf(opts.zkConnectOpt)))
@@ -158,7 +163,7 @@ object TopicCommand extends Logging {
         if (Topic.InternalTopics.contains(topic)) {
           throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
         } else {
-          ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
+          zkUtils.createPersistentPath(getDeleteTopicPath(topic))
           println("Topic %s is marked for deletion.".format(topic))
           println("Note: This will have no impact if delete.topic.enable is not set to true.")
         }
@@ -173,20 +178,20 @@ object TopicCommand extends Logging {
     }
   }
 
-  def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
-    val topics = getTopics(zkClient, opts)
+  def describeTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+    val topics = getTopics(zkUtils, opts)
     val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
     val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
     val reportOverriddenConfigs = if (opts.options.has(opts.topicsWithOverridesOpt)) true else false
-    val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet
+    val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet
     for (topic <- topics) {
-      ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
+      zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic) match {
         case Some(topicPartitionAssignment) =>
           val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
           val describePartitions: Boolean = !reportOverriddenConfigs
           val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
           if (describeConfigs) {
-            val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+            val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
             if (!reportOverriddenConfigs || configs.size() != 0) {
               val numPartitions = topicPartitionAssignment.size
               val replicationFactor = topicPartitionAssignment.head._2.size
@@ -196,8 +201,8 @@ object TopicCommand extends Logging {
           }
           if (describePartitions) {
             for ((partitionId, assignedReplicas) <- sortedPartitions) {
-              val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
-              val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
+              val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
+              val leader = zkUtils.getLeaderForPartition(topic, partitionId)
               if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
                   (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
                   (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 68c7e7f..6ae0347 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -27,7 +27,7 @@ import kafka.utils.{CoreUtils, Logging}
 import java.util.Properties
 import util.Random
 import kafka.network.BlockingChannel
-import kafka.utils.ZkUtils._
+import kafka.utils.ZkUtils
 import org.I0Itec.zkclient.ZkClient
 import java.io.IOException
 
@@ -108,11 +108,11 @@ object ClientUtils extends Logging{
    /**
     * Creates a blocking channel to a random broker
     */
-   def channelToAnyBroker(zkClient: ZkClient, socketTimeoutMs: Int = 3000) : BlockingChannel = {
+   def channelToAnyBroker(zkUtils: ZkUtils, socketTimeoutMs: Int = 3000) : BlockingChannel = {
      var channel: BlockingChannel = null
      var connected = false
      while (!connected) {
-       val allBrokers = getAllBrokerEndPointsForChannel(zkClient, SecurityProtocol.PLAINTEXT)
+       val allBrokers = zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT)
        Random.shuffle(allBrokers).find { broker =>
          trace("Connecting to broker %s:%d.".format(broker.host, broker.port))
          try {
@@ -137,8 +137,8 @@ object ClientUtils extends Logging{
    /**
     * Creates a blocking channel to the offset manager of the given group
     */
-   def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {
-     var queryChannel = channelToAnyBroker(zkClient)
+   def channelToOffsetManager(group: String, zkUtils: ZkUtils, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {
+     var queryChannel = channelToAnyBroker(zkUtils)
 
      var offsetManagerChannelOpt: Option[BlockingChannel] = None
 
@@ -149,7 +149,7 @@ object ClientUtils extends Logging{
        while (!coordinatorOpt.isDefined) {
          try {
            if (!queryChannel.isConnected)
-             queryChannel = channelToAnyBroker(zkClient)
+             queryChannel = channelToAnyBroker(zkUtils)
            debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
            queryChannel.send(ConsumerMetadataRequest(group))
            val response = queryChannel.receive()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ee332ed..59d025b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -42,7 +42,7 @@ class Partition(val topic: String,
                 replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
   private val localBrokerId = replicaManager.config.brokerId
   private val logManager = replicaManager.logManager
-  private val zkClient = replicaManager.zkClient
+  private val zkUtils = replicaManager.zkUtils
   private val assignedReplicaMap = new Pool[Int, Replica]
   // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
   private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
@@ -87,7 +87,7 @@ class Partition(val topic: String,
       case None =>
         if (isReplicaLocal(replicaId)) {
           val config = LogConfig.fromProps(logManager.defaultConfig.originals,
-                                           AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic))
+                                           AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
           val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
           val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
           val offsetMap = checkpoint.read
@@ -426,7 +426,7 @@ class Partition(val topic: String,
 
   private def updateIsr(newIsr: Set[Replica]) {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
-    val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId,
+    val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
       newLeaderAndIsr, controllerEpoch, zkVersion)
 
     if(updateSucceeded) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index eb44c31..a600d5d 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -44,7 +44,7 @@ trait NotificationHandler {
  * @param changeExpirationMs
  * @param time
  */
-class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
+class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
                                        private val seqNodeRoot: String,
                                        private val seqNodePrefix: String,
                                        private val notificationHandler: NotificationHandler,
@@ -56,8 +56,8 @@ class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
    * create seqNodeRoot and begin watching for any new children nodes.
    */
   def init() {
-    ZkUtils.makeSurePersistentPathExists(zkClient, seqNodeRoot)
-    zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
+    zkUtils.makeSurePersistentPathExists(seqNodeRoot)
+    zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
     processAllNotifications()
   }
 
@@ -65,7 +65,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
    * Process all changes
    */
   def processAllNotifications() {
-    val changes = zkClient.getChildren(seqNodeRoot)
+    val changes = zkUtils.zkClient.getChildren(seqNodeRoot)
     processNotifications(changes.asScala.sorted)
   }
 
@@ -80,7 +80,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
         val changeId = changeNumber(notification)
         if (changeId > lastExecutedChange) {
           val changeZnode = seqNodeRoot + "/" + notification
-          val (data, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
+          val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
           data map (notificationHandler.processNotification(_)) getOrElse(logger.warn(s"read null data from $changeZnode when processing notification $notification"))
         }
         lastExecutedChange = changeId
@@ -97,11 +97,11 @@ class ZkNodeChangeNotificationListener(private val zkClient: ZkClient,
   private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
     for (notification <- notifications.sorted) {
       val notificationNode = seqNodeRoot + "/" + notification
-      val (data, stat) = ZkUtils.readDataMaybeNull(zkClient, notificationNode)
+      val (data, stat) = zkUtils.readDataMaybeNull(notificationNode)
       if (data.isDefined) {
         if (now - stat.getCtime > changeExpirationMs) {
           debug(s"Purging change notification $notificationNode")
-          ZkUtils.deletePath(zkClient, notificationNode)
+          zkUtils.deletePath(notificationNode)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 49b683f..e73faf2 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -26,7 +26,7 @@ import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.CoreUtils.inLock
-import kafka.utils.ZkUtils._
+import kafka.utils.ZkUtils
 import kafka.utils.{ShutdownableThread, SystemTime}
 import kafka.common.TopicAndPartition
 import kafka.client.ClientUtils
@@ -39,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger
  */
 class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
-                             private val zkClient : ZkClient)
+                             private val zkUtils : ZkUtils)
         extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
                                        config.clientId, config.numConsumerFetchers) {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
@@ -62,7 +62,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         }
 
         trace("Partitions without leader %s".format(noLeaderPartitionSet))
-        val brokers = getAllBrokerEndPointsForChannel(zkClient, SecurityProtocol.PLAINTEXT)
+        val brokers = zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT)
         val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
                                                             brokers,
                                                             config.clientId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 849284a..5a1bdd0 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -41,19 +41,19 @@ object PartitionAssignor {
   }
 }
 
-class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {
+class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkUtils: ZkUtils) {
   val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {
-    val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)
+    val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkUtils, excludeInternalTopics)
     myTopicCount.getConsumerThreadIdsPerTopic
   }
 
   val partitionsForTopic: collection.Map[String, Seq[Int]] =
-    ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq)
+    zkUtils.getPartitionsForTopics(myTopicThreadIds.keySet.toSeq)
 
   val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] =
-    ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)
+    zkUtils.getConsumersPerTopic(group, excludeInternalTopics)
 
-  val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted
+  val consumers: Seq[String] = zkUtils.getConsumersInGroup(group).sorted
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 6994c8e..5706d3c 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -56,9 +56,9 @@ private[kafka] object TopicCount extends Logging {
     consumerThreadIdsPerTopicMap
   }
 
-  def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient, excludeInternalTopics: Boolean) : TopicCount = {
+  def constructTopicCount(group: String, consumerId: String, zkUtils: ZkUtils, excludeInternalTopics: Boolean) : TopicCount = {
     val dirs = new ZKGroupDirs(group)
-    val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1
+    val topicCountString = zkUtils.readData(dirs.consumerRegistryDir + "/" + consumerId)._1
     var subscriptionPattern: String = null
     var topMap: Map[String, Int] = null
     try {
@@ -94,15 +94,15 @@ private[kafka] object TopicCount extends Logging {
           new Whitelist(regex)
         else
           new Blacklist(regex)
-      new WildcardTopicCount(zkClient, consumerId, filter, numStreams, excludeInternalTopics)
+      new WildcardTopicCount(zkUtils, consumerId, filter, numStreams, excludeInternalTopics)
     }
   }
 
   def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) =
     new StaticTopicCount(consumerIdString, topicCount)
 
-  def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkClient: ZkClient, excludeInternalTopics: Boolean) =
-    new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams, excludeInternalTopics)
+  def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkUtils: ZkUtils, excludeInternalTopics: Boolean) =
+    new WildcardTopicCount(zkUtils, consumerIdString, filter, numStreams, excludeInternalTopics)
 
 }
 
@@ -125,13 +125,13 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
   def pattern = TopicCount.staticPattern
 }
 
-private[kafka] class WildcardTopicCount(zkClient: ZkClient,
+private[kafka] class WildcardTopicCount(zkUtils: ZkUtils,
                                         consumerIdString: String,
                                         topicFilter: TopicFilter,
                                         numStreams: Int,
                                         excludeInternalTopics: Boolean) extends TopicCount {
   def getConsumerThreadIdsPerTopic = {
-    val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
+    val wildcardTopics = zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath)
                          .filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics))
     TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
   }