You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/01/03 22:02:26 UTC

[kafka] branch trunk updated: MINOR: Fix zk client session state metric names and various async zk clean-ups

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 98296f8  MINOR: Fix zk client session state metric names and various async zk clean-ups
98296f8 is described below

commit 98296f852f334067553e541d6ecdfa624f0eb689
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Jan 3 14:02:14 2018 -0800

    MINOR: Fix zk client session state metric names and various async zk clean-ups
    
    - Fix zk session state and session change rate metric names: type
    should be SessionExpireListener instead of KafkaHealthCheck. Test
    verifying the fix was included.
    - Handle missing controller in controlled shutdown in the same way as if
    the broker is not registered (i.e. retry after backoff).
    - Restructure BrokerInfo to reduce duplication. It now contains a
    Broker instance and the JSON serde is done in BrokerIdZNode
    since `Broker` does not contain all the fields.
    - Remove dead code from `ZooKeeperClient.initialize` and remove
    redundant `close` calls.
    - Move ACL handling and persistent paths definition from ZkUtils to
    ZkData (and call ZkData from ZkUtils).
    - Remove ZooKeeperClientWrapper and ZooKeeperClientMetrics from
    ZkUtils (avoids metrics clash if third party users create a ZkUtils
    instance in the same process as the broker).
    - Introduce factory method in KafkaZkClient that creates
    ZooKeeperClient and remove metric name defaults from
    ZooKeeperClient.
    - Fix a few instances where ZooKeeperClient was not closed in tests.
    - Update a few TestUtils methods to use KafkaZkClient instead of
    ZkUtils.
    - Add test verifying SessionState metric.
    - Various clean-ups.
    
    Testing: mostly relying on existing tests, but added a couple
    of new tests as mentioned above.
    
    Author: Ismael Juma <is...@juma.me.uk>
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Jun Rao <ju...@gmail.com>
    
    Closes #4359 from ijuma/kafka-6320-kafka-health-zk-metrics-follow-up
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |   5 +-
 .../PreferredReplicaLeaderElectionCommand.scala    |   5 +-
 .../kafka/admin/ReassignPartitionsCommand.scala    |   4 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala |   5 +-
 core/src/main/scala/kafka/cluster/Broker.scala     | 129 +-----------
 .../common/ZkNodeChangeNotificationListener.scala  |   1 -
 .../scala/kafka/controller/KafkaController.scala   |   4 +-
 .../transaction/ProducerIdManager.scala            |   2 +-
 .../kafka/security/auth/SimpleAclAuthorizer.scala  |   6 +-
 core/src/main/scala/kafka/server/KafkaServer.scala | 114 +++++------
 core/src/main/scala/kafka/utils/Json.scala         |   6 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      | 185 +++++------------
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  54 ++---
 core/src/main/scala/kafka/zk/ZkData.scala          | 219 ++++++++++++++++++---
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    |  44 ++---
 .../kafka/api/AdminClientIntegrationTest.scala     |  35 ++--
 .../AdminClientWithPoliciesIntegrationTest.scala   |  12 +-
 .../integration/kafka/api/BaseConsumerTest.scala   |   4 +-
 .../kafka/api/BaseProducerSendTest.scala           |  18 +-
 .../integration/kafka/api/BaseQuotaTest.scala      |   2 +-
 .../integration/kafka/api/ConsumerBounceTest.scala |   4 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |   2 +-
 .../kafka/api/EndToEndClusterIdTest.scala          |   2 +-
 .../kafka/api/LegacyAdminClientTest.scala          |   2 +-
 .../integration/kafka/api/LogAppendTimeTest.scala  |   2 +-
 .../scala/integration/kafka/api/MetricsTest.scala  |   8 +-
 .../kafka/api/PlaintextConsumerTest.scala          |  46 ++---
 .../kafka/api/PlaintextProducerSendTest.scala      |   2 +-
 .../integration/kafka/api/ProducerBounceTest.scala |   6 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |   2 +-
 .../kafka/api/TransactionsBounceTest.scala         |   4 +-
 .../integration/kafka/api/TransactionsTest.scala   |   8 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    |   2 +-
 .../unit/kafka/admin/DeleteConsumerGroupTest.scala |  20 +-
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |   2 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala    |   2 +-
 .../admin/ReassignPartitionsCommandTest.scala      |  12 +-
 .../unit/kafka/cluster/BrokerEndPointTest.scala    |  20 +-
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |   3 +-
 .../consumer/ZookeeperConsumerConnectorTest.scala  |   2 +-
 .../controller/ControllerIntegrationTest.scala     |  22 +--
 .../kafka/integration/AutoOffsetResetTest.scala    |   2 +-
 .../scala/unit/kafka/integration/FetcherTest.scala |   2 +-
 .../kafka/integration/KafkaServerTestHarness.scala |  17 ++
 .../unit/kafka/integration/PrimitiveApiTest.scala  |   8 +-
 .../consumer/ZookeeperConsumerConnectorTest.scala  |   2 +-
 .../scala/unit/kafka/metrics/MetricsTest.scala     |  17 +-
 .../unit/kafka/producer/SyncProducerTest.scala     |   2 +-
 .../kafka/security/auth/ZkAuthorizationTest.scala  |  14 +-
 .../server/AddPartitionsToTxnRequestTest.scala     |   3 +-
 .../server/AlterReplicaLogDirsRequestTest.scala    |   4 +-
 .../kafka/server/CreateTopicsRequestTest.scala     |   2 +-
 .../server/CreateTopicsRequestWithPolicyTest.scala |   3 +-
 .../kafka/server/DeleteTopicsRequestTest.scala     |  10 +-
 .../kafka/server/DescribeLogDirsRequestTest.scala  |   2 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala    |   2 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala |   5 +-
 .../unit/kafka/server/LogDirFailureTest.scala      |   4 +-
 .../unit/kafka/server/MetadataRequestTest.scala    |  18 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  41 ++--
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |   2 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  15 +-
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |   8 +-
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 120 ++++++-----
 .../integration/InternalTopicIntegrationTest.java  |   9 +-
 .../streams/integration/utils/KafkaEmbedded.java   |  38 ++--
 67 files changed, 684 insertions(+), 695 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 53aa2c1..cf01a5f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -27,7 +27,6 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
 import kafka.utils.CommandLineUtils
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
@@ -64,8 +63,8 @@ object ConfigCommand extends Config {
     opts.checkArgs()
 
     val time = Time.SYSTEM
-    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue, time)
-    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+    val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
+      Int.MaxValue, time)
     val adminZkClient = new AdminZkClient(zkClient)
 
     try {
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index e36b25b..89ab580 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -20,7 +20,6 @@ import joptsimple.OptionParser
 import kafka.utils._
 import kafka.common.AdminCommandFailedException
 import kafka.zk.KafkaZkClient
-import kafka.zookeeper.ZooKeeperClient
 
 import collection._
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -54,12 +53,10 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
 
     val zkConnect = options.valueOf(zkConnectOpt)
-    var zooKeeperClient: ZooKeeperClient = null
     var zkClient: KafkaZkClient = null
     try {
       val time = Time.SYSTEM
-      zooKeeperClient = new ZooKeeperClient(zkConnect, 30000, 30000, Int.MaxValue, time)
-      zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+      zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time)
 
       val partitionsForPreferredReplicaElection =
         if (!options.has(jsonFileOpt))
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 6bcbe91..ed9414b 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -26,7 +26,6 @@ import kafka.log.LogConfig._
 import kafka.server.{ConfigType, DynamicConfig}
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
 import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
 import org.apache.kafka.common.TopicPartitionReplica
@@ -50,8 +49,7 @@ object ReassignPartitionsCommand extends Logging {
     val opts = validateAndParseArgs(args)
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
     val time = Time.SYSTEM
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, 30000, 30000, Int.MaxValue, time)
-    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+    val zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time)
 
     val adminClientOpt = createAdminClient(opts)
 
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index dcf970a..075252d 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -27,7 +27,6 @@ import kafka.log.LogConfig
 import kafka.server.ConfigType
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.security.JaasUtils
@@ -55,8 +54,8 @@ object TopicCommand extends Logging {
     opts.checkArgs()
 
     val time = Time.SYSTEM
-    val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue, time)
-    val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled, time)
+    val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
+      Int.MaxValue, time)
 
     var exitCode = 0
     try {
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index df3be98..425eafc 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -17,139 +17,16 @@
 
 package kafka.cluster
 
-import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
-import kafka.utils.Json
+import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.Time
-
-import scala.collection.Map
-import scala.collection.JavaConverters._
 
 /**
  * A Kafka broker.
- * A broker has an id and a collection of end-points.
- * Each end-point is (host, port, protocolType).
+ * A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map.
+ * Each end-point is (host, port, listenerName).
  */
-object Broker {
-
-  private val HostKey = "host"
-  private val PortKey = "port"
-  private val VersionKey = "version"
-  private val EndpointsKey = "endpoints"
-  private val RackKey = "rack"
-  private val JmxPortKey = "jmx_port"
-  private val ListenerSecurityProtocolMapKey = "listener_security_protocol_map"
-  private val TimestampKey = "timestamp"
-
-  /**
-    * Create a broker object from id and JSON string.
-    *
-    * @param id
-    * @param brokerInfoString
-    *
-    * Version 1 JSON schema for a broker is:
-    * {
-    *   "version":1,
-    *   "host":"localhost",
-    *   "port":9092
-    *   "jmx_port":9999,
-    *   "timestamp":"2233345666"
-    * }
-    *
-    * Version 2 JSON schema for a broker is:
-    * {
-    *   "version":2,
-    *   "host":"localhost",
-    *   "port":9092,
-    *   "jmx_port":9999,
-    *   "timestamp":"2233345666",
-    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
-    * }
-    *
-    * Version 3 JSON schema for a broker is:
-    * {
-    *   "version":3,
-    *   "host":"localhost",
-    *   "port":9092,
-    *   "jmx_port":9999,
-    *   "timestamp":"2233345666",
-    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
-    *   "rack":"dc1"
-    * }
-    *
-    * Version 4 (current) JSON schema for a broker is:
-    * {
-    *   "version":4,
-    *   "host":"localhost",
-    *   "port":9092,
-    *   "jmx_port":9999,
-    *   "timestamp":"2233345666",
-    *   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
-    *   "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
-    *   "rack":"dc1"
-    * }
-    */
-  def createBroker(id: Int, brokerInfoString: String): Broker = {
-    if (brokerInfoString == null)
-      throw new BrokerNotAvailableException(s"Broker id $id does not exist")
-    try {
-      Json.parseFull(brokerInfoString) match {
-        case Some(js) =>
-          val brokerInfo = js.asJsonObject
-          val version = brokerInfo(VersionKey).to[Int]
-
-          val endpoints =
-            if (version < 1)
-              throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString")
-            else if (version == 1) {
-              val host = brokerInfo(HostKey).to[String]
-              val port = brokerInfo(PortKey).to[Int]
-              val securityProtocol = SecurityProtocol.PLAINTEXT
-              val endPoint = new EndPoint(host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
-              Seq(endPoint)
-            }
-            else {
-              val securityProtocolMap = brokerInfo.get(ListenerSecurityProtocolMapKey).map(
-                _.to[Map[String, String]].map { case (listenerName, securityProtocol) =>
-                  new ListenerName(listenerName) -> SecurityProtocol.forName(securityProtocol)
-                })
-              val listeners = brokerInfo(EndpointsKey).to[Seq[String]]
-              listeners.map(EndPoint.createEndPoint(_, securityProtocolMap))
-            }
-
-          val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]])
-          Broker(id, endpoints, rack)
-        case None =>
-          throw new BrokerNotAvailableException(s"Broker id $id does not exist")
-      }
-    } catch {
-      case t: Throwable =>
-        throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
-    }
-  }
-
-  def toJsonBytes(version: Int, id: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint], jmxPort: Int,
-                  rack: Option[String]): Array[Byte] = {
-    val jsonMap = collection.mutable.Map(VersionKey -> version,
-      HostKey -> host,
-      PortKey -> port,
-      EndpointsKey -> advertisedEndpoints.map(_.connectionString).toBuffer.asJava,
-      JmxPortKey -> jmxPort,
-      TimestampKey -> Time.SYSTEM.milliseconds().toString
-    )
-    rack.foreach(rack => if (version >= 3) jsonMap += (RackKey -> rack))
-
-    if (version >= 4) {
-      jsonMap += (ListenerSecurityProtocolMapKey -> advertisedEndpoints.map { endPoint =>
-        endPoint.listenerName.value -> endPoint.securityProtocol.name
-      }.toMap.asJava)
-    }
-    Json.encodeAsBytes(jsonMap.asJava)
-  }
-}
-
 case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) {
 
   private val endPointsMap = endPoints.map { endPoint =>
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 0783f61..5179851 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -143,7 +143,6 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
   object ZkStateChangeHandler extends  StateChangeHandler {
     override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot)
     override def afterInitializingSession(): Unit = addChangeNotification
-    override def onReconnectionTimeout(): Unit = error("Reconnection timeout.")
   }
 }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index ca8422e..6b5c34e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -54,7 +54,8 @@ object KafkaController extends Logging {
 
 }
 
-class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo,
+                      threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   this.logIdent = s"[Controller id=${config.brokerId}] "
 
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
@@ -146,7 +147,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   def startup() = {
     zkClient.registerStateChangeHandler(new StateChangeHandler {
       override val name: String = StateChangeHandlers.ControllerHandler
-      override def onReconnectionTimeout(): Unit = error("Reconnection timeout.")
       override def afterInitializingSession(): Unit = {
         eventManager.put(RegisterBrokerAndReelect)
       }
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 9c815bc..c3c9f7c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.transaction
 import java.nio.charset.StandardCharsets
 
 import kafka.common.KafkaException
-import kafka.utils.{Json, Logging, ZkUtils}
+import kafka.utils.{Json, Logging}
 import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
 
 import scala.collection.JavaConverters._
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 74bc809..c439f5e 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -28,7 +28,6 @@ import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 
@@ -92,9 +91,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     val zkMaxInFlightRequests = configs.get(SimpleAclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
 
     val time = Time.SYSTEM
-    val zooKeeperClient = new ZooKeeperClient(zkUrl, zkSessionTimeOutMs, zkConnectionTimeoutMs, zkMaxInFlightRequests,
-      time, "kafka.security", "SimpleAclAuthorizer")
-    zkClient = new KafkaZkClient(zooKeeperClient, kafkaConfig.zkEnableSecureAcls, time)
+    zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs,
+      zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer")
     zkClient.createAclPaths()
 
     loadCache()
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 8643233..d073584 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,8 +25,8 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_9_0
-import kafka.cluster.{Broker, EndPoint}
-import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, KafkaException}
+import kafka.cluster.Broker
+import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
@@ -37,14 +37,12 @@ import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import kafka.zk.{BrokerInfo, KafkaZkClient}
-import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
@@ -95,7 +93,8 @@ object KafkaServer {
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
+class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
+                  kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
   private val startupComplete = new AtomicBoolean(false)
   private val isShuttingDown = new AtomicBoolean(false)
   private val isStartingUp = new AtomicBoolean(false)
@@ -238,31 +237,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         replicaManager = createReplicaManager(isShuttingDown)
         replicaManager.startup()
 
-        /* tell everyone we are alive */
-        val listeners = config.advertisedListeners.map { endpoint =>
-          if (endpoint.port == 0)
-            endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
-          else
-            endpoint
-        }
-
-        val updatedEndpoints = listeners.map(endpoint =>
-          if (endpoint.host == null || endpoint.host.trim.isEmpty)
-            endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
-          else
-            endpoint
-        )
-
-        // the default host and port are here for compatibility with older clients that only support PLAINTEXT
-        // we choose the first plaintext port, if there is one
-        // or we register an empty endpoint, which means that older clients will not be able to connect
-        val plaintextEndpoint = updatedEndpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).getOrElse(
-          new EndPoint(null, -1, null, null))
-
-        val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-        val brokerInfo = new BrokerInfo(config.brokerId,
-          plaintextEndpoint.host, plaintextEndpoint.port,
-          updatedEndpoints, jmxPort, config.rack, config.interBrokerProtocolVersion)
+        val brokerInfo = createBrokerInfo
         zkClient.registerBrokerInZk(brokerInfo)
 
         // Now that the broker id is successfully registered, checkpoint it
@@ -342,6 +317,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   private def initZkClient(time: Time): Unit = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
 
+    def createZkClient(zkConnect: String, isSecure: Boolean) =
+      KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+        config.zkMaxInFlightRequests, time)
+
     val chrootIndex = config.zkConnect.indexOf("/")
     val chrootOption = {
       if (chrootIndex > 0) Some(config.zkConnect.substring(chrootIndex))
@@ -357,24 +336,39 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     // make sure chroot path exists
     chrootOption.foreach { chroot =>
       val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex)
-      val zooKeeperClient = new ZooKeeperClient(zkConnForChrootCreation, config.zkSessionTimeoutMs,
-        config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
-      val zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled, time)
+      val zkClient = createZkClient(zkConnForChrootCreation, secureAclsEnabled)
       zkClient.makeSurePersistentPathExists(chroot)
       info(s"Created zookeeper path $chroot")
       zkClient.close()
     }
 
-    val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time)
-    _zkClient = new KafkaZkClient(zooKeeperClient, secureAclsEnabled, time)
+    _zkClient = createZkClient(config.zkConnect, secureAclsEnabled)
     _zkClient.createTopLevelPaths()
   }
 
-  def getOrGenerateClusterId(zkClient: KafkaZkClient): String = {
+  private def getOrGenerateClusterId(zkClient: KafkaZkClient): String = {
     zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
   }
 
+  private def createBrokerInfo: BrokerInfo = {
+    val listeners = config.advertisedListeners.map { endpoint =>
+      if (endpoint.port == 0)
+        endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
+      else
+        endpoint
+    }
+
+    val updatedEndpoints = listeners.map(endpoint =>
+      if (endpoint.host == null || endpoint.host.trim.isEmpty)
+        endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
+      else
+        endpoint
+    )
+
+    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+    BrokerInfo(Broker(config.brokerId, updatedEndpoints, config.rack), config.interBrokerProtocolVersion, jmxPort)
+  }
+
   /**
    *  Performs controlled shutdown
    */
@@ -438,22 +432,29 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           // 1. Find the controller and establish a connection to it.
 
           // Get the current controller info. This is to ensure we use the most recent info to issue the
-          // controlled shutdown request
-          val controllerId = zkClient.getControllerId.getOrElse(throw new KafkaException("Controller doesn't exist"))
-          //If this method returns None ignore and try again
-          zkClient.getBroker(controllerId).foreach { broker =>
-            // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
-            // attempt, connect to the most recent controller
-            if (ioException || broker != prevController) {
-
-              ioException = false
-
-              if (prevController != null)
-                networkClient.close(node(prevController).idString)
-
-              prevController = broker
-              metadataUpdater.setNodes(Seq(node(prevController)).asJava)
-            }
+          // controlled shutdown request.
+          // If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
+          zkClient.getControllerId match {
+            case Some(controllerId) =>
+              zkClient.getBroker(controllerId) match {
+                case Some(broker) =>
+                  // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
+                  // attempt, connect to the most recent controller
+                  if (ioException || broker != prevController) {
+
+                    ioException = false
+
+                    if (prevController != null)
+                      networkClient.close(node(prevController).idString)
+
+                    prevController = broker
+                    metadataUpdater.setNodes(Seq(node(prevController)).asJava)
+                  }
+                case None =>
+                  info(s"Broker registration for controller $controllerId is not available (i.e. the Controller's ZK session expired)")
+              }
+            case None =>
+              info("No controller registered in ZooKeeper")
           }
 
           // 2. issue a controlled shutdown to the controller
@@ -477,14 +478,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
                 info("Controlled shutdown succeeded")
               }
               else {
-                info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.asScala.mkString(",")))
-                info("Error code from controller: %d".format(shutdownResponse.error.code))
+                info(s"Remaining partitions to move: ${shutdownResponse.partitionsRemaining.asScala.mkString(",")}")
+                info(s"Error from controller: ${shutdownResponse.error}")
               }
             }
             catch {
               case ioe: IOException =>
                 ioException = true
-                warn("Error during controlled shutdown, possibly because leader movement took longer than the configured controller.socket.timeout.ms and/or request.timeout.ms: %s".format(ioe.getMessage))
+                warn("Error during controlled shutdown, possibly because leader movement took longer than the " +
+                  s"configured controller.socket.timeout.ms and/or request.timeout.ms: ${ioe.getMessage}")
                 // ignore and try again
             }
           }
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index e8e7d8a..cbb8dac 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -16,8 +16,6 @@
  */
 package kafka.utils
 
-import java.nio.charset.StandardCharsets
-
 import com.fasterxml.jackson.core.JsonProcessingException
 import com.fasterxml.jackson.databind.ObjectMapper
 import kafka.utils.json.JsonValue
@@ -54,6 +52,10 @@ object Json {
     try Option(mapper.readTree(input)).map(JsonValue(_))
     catch { case _: JsonProcessingException => None }
 
+  def tryParseBytes(input: Array[Byte]): Either[JsonProcessingException, JsonValue] =
+    try Right(mapper.readTree(input)).right.map(JsonValue(_))
+    catch { case e: JsonProcessingException => Left(e) }
+
   /**
    * Encode an object into a JSON string. This method accepts any type T where
    *   T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 1f665e6..ac8b932 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -18,7 +18,7 @@
 package kafka.utils
 
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.CountDownLatch
 
 import kafka.admin._
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
@@ -26,15 +26,11 @@ import kafka.cluster._
 import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
 import kafka.consumer.{ConsumerThreadId, TopicCount}
 import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
-import kafka.metrics.KafkaMetricsGroup
-import kafka.server.ConfigType
-import kafka.utils.ZkUtils._
-import com.yammer.metrics.core.MetricName
+import kafka.zk.{BrokerIdZNode, ZkData}
 import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
 import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
@@ -71,28 +67,10 @@ object ZkUtils {
   val ConfigChangesPath = s"$ConfigPath/changes"
   val ConfigUsersPath = s"$ConfigPath/users"
   val ProducerIdBlockPath = "/latest_producer_id_block"
-  // Important: it is necessary to add any new top level Zookeeper path to the Seq
-  val SecureZkRootPaths = Seq(AdminPath,
-                              BrokersPath,
-                              ClusterPath,
-                              ConfigPath,
-                              ControllerPath,
-                              ControllerEpochPath,
-                              IsrChangeNotificationPath,
-                              KafkaAclPath,
-                              KafkaAclChangesPath,
-                              ProducerIdBlockPath,
-                              LogDirEventNotificationPath)
-
-  // Important: it is necessary to add any new top level Zookeeper path that contains
-  //            sensitive information that should not be world readable to the Seq
-  val SensitiveZkRootPaths = Seq(ConfigUsersPath)
-
-  def withMetrics(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean,
-                  time: Time): ZkUtils = {
-    val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
-    new ZkUtils(new ZooKeeperClientMetrics(zkClient, time), zkConnection, isZkSecurityEnabled)
-  }
+
+  val SecureZkRootPaths = ZkData.SecureRootPaths
+
+  val SensitiveZkRootPaths = ZkData.SensitiveRootPaths
 
   def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
     val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
@@ -117,24 +95,12 @@ object ZkUtils {
     (zkClient, zkConnection)
   }
 
-  def sensitivePath(path: String): Boolean = {
-    path != null && SensitiveZkRootPaths.exists(path.startsWith(_))
-  }
-
   @deprecated("This is deprecated, use defaultAcls(isSecure, path) which doesn't make sensitive data world readable", since = "0.10.2.1")
   def DefaultAcls(isSecure: Boolean): java.util.List[ACL] = defaultAcls(isSecure, "")
 
-  def defaultAcls(isSecure: Boolean, path: String): java.util.List[ACL] = {
-    if (isSecure) {
-      val list = new java.util.ArrayList[ACL]
-      list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
-      if (!sensitivePath(path)) {
-        list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
-      }
-      list
-    } else
-      ZooDefs.Ids.OPEN_ACL_UNSAFE
-  }
+  def sensitivePath(path: String): Boolean = ZkData.sensitivePath(path)
+
+  def defaultAcls(isSecure: Boolean, path: String): java.util.List[ACL] = ZkData.defaultAcls(isSecure, path).asJava
 
   def maybeDeletePath(zkUrl: String, dir: String) {
     try {
@@ -234,72 +200,25 @@ object ZkUtils {
   }
 }
 
-class ZooKeeperClientWrapper(val zkClient: ZkClient) {
-  def apply[T](method: ZkClient => T): T = method(zkClient)
-  def close(): Unit = {
-    if(zkClient != null)
-      zkClient.close()
-  }
-}
-
-class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
-    extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup {
-  private val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
-
-  override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
-    explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
-  }
-
-  override def apply[T](method: ZkClient => T): T = {
-    val startNs = time.nanoseconds
-    val ret =
-      try method(zkClient)
-      finally latencyMetric.update(TimeUnit.NANOSECONDS.toMillis(time.nanoseconds - startNs))
-    ret
-  }
-
-  override def close(): Unit = {
-    if (latencyMetric != null)
-      removeMetric("ZooKeeperRequestLatencyMs")
-    super.close()
-  }
-}
-
 /**
  * Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead.
  */
-class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
+class ZkUtils(val zkClient: ZkClient,
               val zkConnection: ZkConnection,
               val isSecure: Boolean) extends Logging {
+  import ZkUtils._
+  
   // These are persistent ZK paths that should exist on kafka broker startup.
-  val persistentZkPaths = Seq(ConsumersPath,
-                              BrokerIdsPath,
-                              BrokerTopicsPath,
-                              ConfigChangesPath,
-                              getEntityConfigRootPath(ConfigType.Topic),
-                              getEntityConfigRootPath(ConfigType.Client),
-                              DeleteTopicsPath,
-                              BrokerSequenceIdPath,
-                              IsrChangeNotificationPath,
-                              ProducerIdBlockPath,
-                              LogDirEventNotificationPath)
-
-  /** Present for compatibility */
-  def this(zkClient: ZkClient, zkConnection: ZkConnection, isSecure: Boolean) =
-    this(new ZooKeeperClientWrapper(zkClient), zkConnection, isSecure)
+  val persistentZkPaths = ZkData.PersistentZkPaths
 
   // Visible for testing
-  val zkPath = new ZkPath(zkClientWrap)
-
-  import ZkUtils._
+  val zkPath = new ZkPath(zkClient)
 
   @deprecated("This is deprecated, use defaultAcls(path) which doesn't make sensitive data world readable", since = "0.10.2.1")
   val DefaultAcls: java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, "")
 
   def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path)
 
-  def zkClient: ZkClient = zkClientWrap.zkClient
-
   def getController(): Int = {
     readDataMaybeNull(ControllerPath)._1 match {
       case Some(controller) => parseControllerId(controller)
@@ -470,7 +389,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
     val brokerIdPath = BrokerIdsPath + "/" + id
     // see method documentation for reason why we do this
     val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
-    val json = new String(Broker.toJsonBytes(version, id, host, port, advertisedEndpoints, jmxPort, rack),
+    val json = new String(BrokerIdZNode.encode(version, host, port, advertisedEndpoints, jmxPort, rack),
       StandardCharsets.UTF_8)
     registerBrokerInZk(brokerIdPath, json)
 
@@ -483,7 +402,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
                                                       brokerInfo,
                                                       zkConnection.getZookeeper,
                                                       isSecure)
-      zkClientWrap(_ => zkCheckedEphemeral.create())
+      zkCheckedEphemeral.create()
     } catch {
       case _: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
@@ -524,7 +443,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
       acls
     }
 
-    if (!zkClientWrap(zkClient => zkClient.exists(path)))
+    if (!zkClient.exists(path))
       zkPath.createPersistent(path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException
   }
 
@@ -607,7 +526,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
-      zkClientWrap(_.writeData(path, data))
+      zkClient.writeData(path, data)
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
@@ -615,7 +534,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
           zkPath.createPersistent(path, data, acl)
         } catch {
           case _: ZkNodeExistsException =>
-            zkClientWrap(_.writeData(path, data))
+            zkClient.writeData(path, data)
         }
     }
   }
@@ -631,7 +550,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int,
     optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
     try {
-      val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
+      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
       debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
@@ -658,7 +577,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
    */
   def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = {
     try {
-      val stat = zkClientWrap(_.writeDataReturnStat(path, data, expectVersion))
+      val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
       debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
         .format(path, data, expectVersion, stat.getVersion))
       (true, stat.getVersion)
@@ -678,7 +597,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
     val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
     try {
-      zkClientWrap(_.writeData(path, data))
+      zkClient.writeData(path, data)
     } catch {
       case _: ZkNoNodeException =>
         createParentPath(path)
@@ -687,7 +606,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   }
 
   def deletePath(path: String): Boolean = {
-    zkClientWrap(_.delete(path))
+    zkClient.delete(path)
   }
 
   /**
@@ -696,7 +615,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
     */
    def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
     try {
-      zkClientWrap(_.delete(path, expectedVersion))
+      zkClient.delete(path, expectedVersion)
       true
     } catch {
       case _: ZkBadVersionException => false
@@ -704,37 +623,37 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   }
 
   def deletePathRecursive(path: String) {
-    zkClientWrap(_.deleteRecursive(path))
+    zkClient.deleteRecursive(path)
   }
 
   def subscribeDataChanges(path: String, listener: IZkDataListener): Unit =
-    zkClientWrap(_.subscribeDataChanges(path, listener))
+    zkClient.subscribeDataChanges(path, listener)
 
   def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit =
-    zkClientWrap(_.unsubscribeDataChanges(path, dataListener))
+    zkClient.unsubscribeDataChanges(path, dataListener)
 
   def subscribeStateChanges(listener: IZkStateListener): Unit =
-    zkClientWrap(_.subscribeStateChanges(listener))
+    zkClient.subscribeStateChanges(listener)
 
   def subscribeChildChanges(path: String, listener: IZkChildListener): Option[Seq[String]] =
-    Option(zkClientWrap(_.subscribeChildChanges(path, listener))).map(_.asScala)
+    Option(zkClient.subscribeChildChanges(path, listener)).map(_.asScala)
 
   def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit =
-    zkClientWrap(_.unsubscribeChildChanges(path, childListener))
+    zkClient.unsubscribeChildChanges(path, childListener)
 
   def unsubscribeAll(): Unit =
-    zkClientWrap(_.unsubscribeAll())
+    zkClient.unsubscribeAll()
 
   def readData(path: String): (String, Stat) = {
     val stat: Stat = new Stat()
-    val dataStr: String = zkClientWrap(_.readData[String](path, stat))
+    val dataStr: String = zkClient.readData[String](path, stat)
     (dataStr, stat)
   }
 
   def readDataMaybeNull(path: String): (Option[String], Stat) = {
     val stat = new Stat()
     val dataAndStat = try {
-                        val dataStr = zkClientWrap(_.readData[String](path, stat))
+                        val dataStr = zkClient.readData[String](path, stat)
                         (Some(dataStr), stat)
                       } catch {
                         case _: ZkNoNodeException =>
@@ -746,18 +665,18 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   def readDataAndVersionMaybeNull(path: String): (Option[String], Int) = {
     val stat = new Stat()
     try {
-      val data = zkClientWrap(_.readData[String](path, stat))
+      val data = zkClient.readData[String](path, stat)
       (Option(data), stat.getVersion)
     } catch {
       case _: ZkNoNodeException => (None, stat.getVersion)
     }
   }
 
-  def getChildren(path: String): Seq[String] = zkClientWrap(_.getChildren(path)).asScala
+  def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
 
   def getChildrenParentMayNotExist(path: String): Seq[String] = {
     try {
-      zkClientWrap(_.getChildren(path)).asScala
+      zkClient.getChildren(path).asScala
     } catch {
       case _: ZkNoNodeException => Nil
     }
@@ -767,7 +686,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
    * Check if the given path exists
    */
   def pathExists(path: String): Boolean = {
-    zkClientWrap(_.exists(path))
+    zkClient.exists(path)
   }
 
   def isTopicMarkedForDeletion(topic: String): Boolean = {
@@ -779,11 +698,15 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
     val nodes = getChildrenParentMayNotExist(BrokerIdsPath)
     for (node <- nodes) {
       val brokerZKString = readData(BrokerIdsPath + "/" + node)._1
-      cluster.add(Broker.createBroker(node.toInt, brokerZKString))
+      cluster.add(parseBrokerJson(node.toInt, brokerZKString))
     }
     cluster
   }
 
+  private def parseBrokerJson(id: Int, jsonString: String): Broker = {
+    BrokerIdZNode.decode(id, jsonString.getBytes(StandardCharsets.UTF_8)).broker
+  }
+
   def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     for(topicAndPartition <- topicAndPartitions) {
@@ -894,9 +817,9 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
 
   def deletePartition(brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
-    zkClientWrap(_.delete(brokerIdPath))
+    zkClient.delete(brokerIdPath)
     val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId
-    zkClientWrap(_.delete(brokerPartTopicPath))
+    zkClient.delete(brokerPartTopicPath)
   }
 
   @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
@@ -944,7 +867,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
    */
   def getBrokerInfo(brokerId: Int): Option[Broker] = {
     readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match {
-      case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo))
+      case Some(brokerInfo) => Some(parseBrokerJson(brokerId, brokerInfo))
       case None => None
     }
   }
@@ -956,7 +879,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
     */
   def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = {
     val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls
-    def writeToZk: Int = zkClientWrap(_.writeDataReturnStat(path, "", -1)).getVersion
+    def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
     try {
       writeToZk
     } catch {
@@ -1020,7 +943,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
   }
 
   def close() {
-    zkClientWrap.close()
+    zkClient.close()
   }
 }
 
@@ -1040,7 +963,7 @@ private object ZKStringSerializer extends ZkSerializer {
 
 @deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class ZKGroupDirs(val group: String) {
-  def consumerDir = ConsumersPath
+  def consumerDir = ZkUtils.ConsumersPath
   def consumerGroupDir = consumerDir + "/" + group
   def consumerRegistryDir = consumerGroupDir + "/ids"
   def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"
@@ -1076,7 +999,7 @@ class ZKConfig(props: VerifiableProperties) {
   val zkSyncTimeMs = props.getInt(ZkSyncTimeMsProp, 2000)
 }
 
-class ZkPath(clientWrap: ZooKeeperClientWrapper) {
+class ZkPath(zkClient: ZkClient) {
 
   @volatile private var isNamespacePresent: Boolean = false
 
@@ -1084,7 +1007,7 @@ class ZkPath(clientWrap: ZooKeeperClientWrapper) {
     if (isNamespacePresent)
       return
 
-    if (!clientWrap(_.exists("/"))) {
+    if (!zkClient.exists("/")) {
       throw new ConfigException("Zookeeper namespace does not exist")
     }
     isNamespacePresent = true
@@ -1096,22 +1019,22 @@ class ZkPath(clientWrap: ZooKeeperClientWrapper) {
 
   def createPersistent(path: String, data: Object, acls: java.util.List[ACL]) {
     checkNamespace()
-    clientWrap(_.createPersistent(path, data, acls))
+    zkClient.createPersistent(path, data, acls)
   }
 
   def createPersistent(path: String, createParents: Boolean, acls: java.util.List[ACL]) {
     checkNamespace()
-    clientWrap(_.createPersistent(path, createParents, acls))
+    zkClient.createPersistent(path, createParents, acls)
   }
 
   def createEphemeral(path: String, data: Object, acls: java.util.List[ACL]) {
     checkNamespace()
-    clientWrap(_.createEphemeral(path, data, acls))
+    zkClient.createEphemeral(path, data, acls)
   }
 
   def createPersistentSequential(path: String, data: Object, acls: java.util.List[ACL]): String = {
     checkNamespace()
-    clientWrap(_.createPersistentSequential(path, data, acls))
+    zkClient.createPersistentSequential(path, data, acls)
   }
 }
 
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 8c3f018..8179300 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.ConfigType
-import kafka.utils._
+import kafka.utils.Logging
 import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
@@ -49,7 +49,8 @@ import scala.collection.{Seq, mutable}
  * easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are
  * in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go.
  */
-class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends Logging with KafkaMetricsGroup {
+class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with
+  Logging with KafkaMetricsGroup {
 
   override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
     explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags)
@@ -75,12 +76,11 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
   }
 
   def registerBrokerInZk(brokerInfo: BrokerInfo): Unit = {
-    val brokerIdPath = brokerInfo.path()
-    checkedEphemeralCreate(brokerIdPath, brokerInfo.encode())
-    info("Registered broker %d at path %s with addresses: %s".format(brokerInfo.id, brokerIdPath, brokerInfo.endpoints()))
+    val path = brokerInfo.path
+    checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
+    info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}")
   }
 
-
   /**
    * Gets topic partition states for the given partitions.
    * @param partitions the partitions for which we want ot get states.
@@ -292,7 +292,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
       val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
       getDataResponse.resultCode match {
         case Code.OK =>
-          Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+          Option(BrokerIdZNode.decode(brokerId, getDataResponse.data).broker)
         case Code.NONODE => None
         case _ => throw getDataResponse.resultException.get
       }
@@ -308,7 +308,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     getDataResponse.resultCode match {
       case Code.OK =>
-        Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+        Option(BrokerIdZNode.decode(brokerId, getDataResponse.data).broker)
       case Code.NONODE => None
       case _ => throw getDataResponse.resultException.get
     }
@@ -453,7 +453,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
    * @param topics the topics whose partitions we wish to get the assignments for.
    * @return the partition assignment for each partition from the given topics.
    */
-  def getPartitionAssignmentForTopics(topics: Set[String]):  Map[String, Map[Int, Seq[Int]]] = {
+  def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, Seq[Int]]] = {
     val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
     val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
     getDataResponses.flatMap { getDataResponse =>
@@ -657,7 +657,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
     val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     getDataResponse.resultCode match {
-      case  Code.OK => ReassignPartitionsZNode.decode(getDataResponse.data)
+      case Code.OK => ReassignPartitionsZNode.decode(getDataResponse.data)
       case Code.NONODE => Map.empty
       case _ => throw getDataResponse.resultException.get
     }
@@ -756,13 +756,8 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
    * @param partition
    * @return optional integer if the leader exists and None otherwise.
    */
-  def getLeaderForPartition(partition: TopicPartition): Option[Int] = {
-    val leaderIsrEpoch = getTopicPartitionState(partition)
-    if (leaderIsrEpoch.isDefined)
-      Option(leaderIsrEpoch.get.leaderAndIsr.leader)
-    else
-      None
-  }
+  def getLeaderForPartition(partition: TopicPartition): Option[Int] =
+    getTopicPartitionState(partition).map(_.leaderAndIsr.leader)
 
   /**
    * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
@@ -1217,7 +1212,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
   }
 
   /**
-    * Generate a borker id by updating the broker sequence id path in ZK and return the version of the path.
+    * Generate a broker id by updating the broker sequence id path in ZK and return the version of the path.
     * The version is incremented by one on every update starting from 1.
     * @return sequence number as the broker id
     */
@@ -1351,10 +1346,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: T
     retryRequestsUntilConnected(getDataRequests)
   }
 
-  private def acls(path: String): Seq[ACL] = {
-    import scala.collection.JavaConverters._
-    ZkUtils.defaultAcls(isSecure, path).asScala
-  }
+  private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path)
 
   private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
     retryRequestsUntilConnected(Seq(request)).head
@@ -1444,4 +1436,22 @@ object KafkaZkClient {
   case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicPartition, LeaderAndIsr],
                                       partitionsToRetry: Seq[TopicPartition],
                                       failedPartitions: Map[TopicPartition, Exception])
+
+  /**
+   * Create an instance of this class with the provided parameters.
+   *
+   * The metric group and type are preserved by default for compatibility with previous versions.
+   */
+  def apply(connectString: String,
+            isSecure: Boolean,
+            sessionTimeoutMs: Int,
+            connectionTimeoutMs: Int,
+            maxInFlightRequests: Int,
+            time: Time,
+            metricGroup: String = "kafka.server",
+            metricType: String = "SessionExpireListener") = {
+    val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
+      time, metricGroup, metricType)
+    new KafkaZkClient(zooKeeperClient, isSecure, time)
+  }
 }
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 9578129..2c86c2c 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -28,9 +28,15 @@ import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.server.ConfigType
 import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
-import org.apache.zookeeper.data.Stat
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.ZooDefs
+import org.apache.zookeeper.data.{ACL, Stat}
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
+import scala.collection.mutable.ArrayBuffer
 
 // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
 
@@ -63,42 +69,158 @@ object BrokerIdsZNode {
   def encode: Array[Byte] = null
 }
 
-class BrokerInfo(val id: Int,
-                 host: String,
-                 port: Int,
-                 advertisedEndpoints: Seq[EndPoint],
-                 jmxPort: Int,
-                 rack: Option[String],
-                 apiVersion: ApiVersion) {
+object BrokerInfo {
 
-  def path(): String = {
-    BrokerIdZNode.path(id)
+  /**
+   * Create a broker info with v4 json format (which includes multiple endpoints and rack) if
+   * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise.
+   *
+   * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2.
+   *
+   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above without having to
+   * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case).
+   */
+  def apply(broker: Broker, apiVersion: ApiVersion, jmxPort: Int): BrokerInfo = {
+    // see method documentation for the reason why we do this
+    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+    BrokerInfo(broker, version, jmxPort)
   }
 
-  def endpoints(): String = {
-    advertisedEndpoints.mkString(",")
-  }
+}
 
-  def encode(): Array[Byte] = {
-    BrokerIdZNode.encode(id, host, port, advertisedEndpoints, jmxPort, rack, apiVersion)
-  }
+case class BrokerInfo(broker: Broker, version: Int, jmxPort: Int) {
+  val path: String = BrokerIdZNode.path(broker.id)
+  def toJsonBytes: Array[Byte] = BrokerIdZNode.encode(this)
 }
 
 object BrokerIdZNode {
+  private val HostKey = "host"
+  private val PortKey = "port"
+  private val VersionKey = "version"
+  private val EndpointsKey = "endpoints"
+  private val RackKey = "rack"
+  private val JmxPortKey = "jmx_port"
+  private val ListenerSecurityProtocolMapKey = "listener_security_protocol_map"
+  private val TimestampKey = "timestamp"
+
   def path(id: Int) = s"${BrokerIdsZNode.path}/$id"
-  def encode(id: Int,
-             host: String,
-             port: Int,
-             advertisedEndpoints: Seq[EndPoint],
-             jmxPort: Int,
-             rack: Option[String],
-             apiVersion: ApiVersion): Array[Byte] = {
-    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
-    Broker.toJsonBytes(version, id, host, port, advertisedEndpoints, jmxPort, rack)
+
+  /**
+   * Encode to JSON bytes.
+   *
+   * The JSON format includes a top level host and port for compatibility with older clients.
+   */
+  def encode(version: Int, host: String, port: Int, advertisedEndpoints: Seq[EndPoint], jmxPort: Int,
+             rack: Option[String]): Array[Byte] = {
+    val jsonMap = collection.mutable.Map(VersionKey -> version,
+      HostKey -> host,
+      PortKey -> port,
+      EndpointsKey -> advertisedEndpoints.map(_.connectionString).toBuffer.asJava,
+      JmxPortKey -> jmxPort,
+      TimestampKey -> Time.SYSTEM.milliseconds().toString
+    )
+    rack.foreach(rack => if (version >= 3) jsonMap += (RackKey -> rack))
+
+    if (version >= 4) {
+      jsonMap += (ListenerSecurityProtocolMapKey -> advertisedEndpoints.map { endPoint =>
+        endPoint.listenerName.value -> endPoint.securityProtocol.name
+      }.toMap.asJava)
+    }
+    Json.encodeAsBytes(jsonMap.asJava)
+  }
+
+  def encode(brokerInfo: BrokerInfo): Array[Byte] = {
+    val broker = brokerInfo.broker
+    // the default host and port are here for compatibility with older clients that only support PLAINTEXT
+    // we choose the first plaintext port, if there is one
+    // or we register an empty endpoint, which means that older clients will not be able to connect
+    val plaintextEndpoint = broker.endPoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).getOrElse(
+      new EndPoint(null, -1, null, null))
+    encode(brokerInfo.version, plaintextEndpoint.host, plaintextEndpoint.port, broker.endPoints, brokerInfo.jmxPort,
+      broker.rack)
   }
 
-  def decode(id: Int, bytes: Array[Byte]): Broker = {
-    Broker.createBroker(id, new String(bytes, UTF_8))
+  /**
+    * Create a BrokerInfo object from id and JSON bytes.
+    *
+    * @param id
+    * @param jsonBytes
+    *
+    * Version 1 JSON schema for a broker is:
+    * {
+    *   "version":1,
+    *   "host":"localhost",
+    *   "port":9092
+    *   "jmx_port":9999,
+    *   "timestamp":"2233345666"
+    * }
+    *
+    * Version 2 JSON schema for a broker is:
+    * {
+    *   "version":2,
+    *   "host":"localhost",
+    *   "port":9092,
+    *   "jmx_port":9999,
+    *   "timestamp":"2233345666",
+    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
+    * }
+    *
+    * Version 3 JSON schema for a broker is:
+    * {
+    *   "version":3,
+    *   "host":"localhost",
+    *   "port":9092,
+    *   "jmx_port":9999,
+    *   "timestamp":"2233345666",
+    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
+    *   "rack":"dc1"
+    * }
+    *
+    * Version 4 (current) JSON schema for a broker is:
+    * {
+    *   "version":4,
+    *   "host":"localhost",
+    *   "port":9092,
+    *   "jmx_port":9999,
+    *   "timestamp":"2233345666",
+    *   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
+    *   "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
+    *   "rack":"dc1"
+    * }
+    */
+  def decode(id: Int, jsonBytes: Array[Byte]): BrokerInfo = {
+    Json.tryParseBytes(jsonBytes) match {
+      case Right(js) =>
+        val brokerInfo = js.asJsonObject
+        val version = brokerInfo(VersionKey).to[Int]
+        val jmxPort = brokerInfo(JmxPortKey).to[Int]
+
+        val endpoints =
+          if (version < 1)
+            throw new KafkaException("Unsupported version of broker registration: " +
+              s"${new String(jsonBytes, UTF_8)}")
+          else if (version == 1) {
+            val host = brokerInfo(HostKey).to[String]
+            val port = brokerInfo(PortKey).to[Int]
+            val securityProtocol = SecurityProtocol.PLAINTEXT
+            val endPoint = new EndPoint(host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
+            Seq(endPoint)
+          }
+          else {
+            val securityProtocolMap = brokerInfo.get(ListenerSecurityProtocolMapKey).map(
+              _.to[Map[String, String]].map { case (listenerName, securityProtocol) =>
+                new ListenerName(listenerName) -> SecurityProtocol.forName(securityProtocol)
+              })
+            val listeners = brokerInfo(EndpointsKey).to[Seq[String]]
+            listeners.map(EndPoint.createEndPoint(_, securityProtocolMap))
+          }
+
+        val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]])
+        BrokerInfo(Broker(id, endpoints, rack), version, jmxPort)
+      case Left(e) =>
+        throw new KafkaException(s"Failed to parse ZooKeeper registration for broker $id: " +
+          s"${new String(jsonBytes, UTF_8)}", e)
+    }
   }
 }
 
@@ -342,8 +464,12 @@ object AclChangeNotificationSequenceZNode {
   def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
 }
 
+object ClusterZNode {
+  def path = "/cluster"
+}
+
 object ClusterIdZNode {
-  def path = "/cluster/id"
+  def path = s"${ClusterZNode.path}/id"
 
   def toJson(id: String): Array[Byte] = {
     Json.encodeAsBytes(Map("version" -> "1", "id" -> id).asJava)
@@ -365,17 +491,46 @@ object ProducerIdBlockZNode {
 }
 
 object ZkData {
+
+  // Important: it is necessary to add any new top level Zookeeper path to the Seq
+  val SecureRootPaths = Seq(AdminZNode.path,
+    BrokersZNode.path,
+    ClusterZNode.path,
+    ConfigZNode.path,
+    ControllerZNode.path,
+    ControllerEpochZNode.path,
+    IsrChangeNotificationZNode.path,
+    AclZNode.path,
+    AclChangeNotificationZNode.path,
+    ProducerIdBlockZNode.path,
+    LogDirEventNotificationZNode.path)
+
   // These are persistent ZK paths that should exist on kafka broker startup.
   val PersistentZkPaths = Seq(
     "/consumers",  // old consumer path
     BrokerIdsZNode.path,
     TopicsZNode.path,
     ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityTypeZNode.path(ConfigType.Topic),
-    ConfigEntityTypeZNode.path(ConfigType.Client),
     DeleteTopicsZNode.path,
     BrokerSequenceIdZNode.path,
     IsrChangeNotificationZNode.path,
     ProducerIdBlockZNode.path,
-    LogDirEventNotificationZNode.path)
-}
\ No newline at end of file
+    LogDirEventNotificationZNode.path
+  ) ++ ConfigType.all.map(ConfigEntityTypeZNode.path)
+
+  val SensitiveRootPaths = Seq(ConfigEntityTypeZNode.path(ConfigType.User))
+
+  def sensitivePath(path: String): Boolean = {
+    path != null && SensitiveRootPaths.exists(path.startsWith)
+  }
+
+  def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = {
+    if (isSecure) {
+      val acls = new ArrayBuffer[ACL]
+      acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala
+      if (!sensitivePath(path))
+        acls ++= ZooDefs.Ids.READ_ACL_UNSAFE.asScala
+      acls
+    } else ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala
+  }
+}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6d786dc..9a1d162 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -49,8 +49,8 @@ class ZooKeeperClient(connectString: String,
                       connectionTimeoutMs: Int,
                       maxInFlightRequests: Int,
                       time: Time,
-                      metricGroup: String = "kafka.server",
-                      metricType: String = "KafkaHealthcheck") extends Logging with KafkaMetricsGroup {
+                      metricGroup: String,
+                      metricType: String) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[ZooKeeperClient] "
   private val initializationLock = new ReentrantReadWriteLock()
   private val isConnectedOrExpiredLock = new ReentrantLock()
@@ -81,27 +81,27 @@ class ZooKeeperClient(connectString: String,
   }
 
   info(s"Initializing a new session to $connectString.")
+  // Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
   @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
 
-  private val sessionStateGauge =
-    newGauge("SessionState", new Gauge[String] {
-      override def value: String =
-        Option(zooKeeper.getState.toString).getOrElse("DISCONNECTED")
-    })
+  newGauge("SessionState", new Gauge[String] {
+    override def value: String = Option(connectionState.toString).getOrElse("DISCONNECTED")
+  })
 
   metricNames += "SessionState"
 
   waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
 
-
-  /**
-    * This is added to preserve the original metric name in JMX
-    */
   override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
     explicitMetricName(metricGroup, metricType, name, metricTags)
   }
 
   /**
+   * Return the state of the ZooKeeper connection.
+   */
+  def connectionState: States = zooKeeper.getState
+
+  /**
    * Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details.
    *
    * @param request a single request to send and wait on.
@@ -214,13 +214,13 @@ class ZooKeeperClient(connectString: String,
     info("Waiting until connected.")
     var nanos = timeUnit.toNanos(timeout)
     inLock(isConnectedOrExpiredLock) {
-      var state = zooKeeper.getState
+      var state = connectionState
       while (!state.isConnected && state.isAlive) {
         if (nanos <= 0) {
           throw new ZooKeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state")
         }
         nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
-        state = zooKeeper.getState
+        state = connectionState
       }
       if (state == States.AUTH_FAILED) {
         throw new ZooKeeperClientAuthFailedException("Auth failed either before or while waiting for connection")
@@ -309,24 +309,23 @@ class ZooKeeperClient(connectString: String,
   def sessionId: Long = inReadLock(initializationLock) {
     zooKeeper.getSessionId
   }
-
+  
   private def initialize(): Unit = {
-    if (!zooKeeper.getState.isAlive) {
+    if (!connectionState.isAlive) {
+      zooKeeper.close()
       info(s"Initializing a new session to $connectString.")
       // retry forever until ZooKeeper can be instantiated
-      while (true) {
+      var connected = false
+      while (!connected) {
         try {
-          zooKeeper.close()
           zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
-          return
+          connected = true
         } catch {
           case e: Exception =>
-            info("Error when recreating ZooKeeper", e)
+            info("Error when recreating ZooKeeper, retrying after a short sleep", e)
             Thread.sleep(1000)
         }
       }
-      info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}")
-      stateChangeHandlers.values.foreach(_.onReconnectionTimeout())
     }
   }
 
@@ -341,7 +340,7 @@ class ZooKeeperClient(connectString: String,
   // package level visibility for testing only
   private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
     override def process(event: WatchedEvent): Unit = {
-      debug("Received event: " + event)
+      debug(s"Received event: $event")
       Option(event.getPath) match {
         case None =>
           val state = event.getState
@@ -377,7 +376,6 @@ trait StateChangeHandler {
   def beforeInitializingSession(): Unit = {}
   def afterInitializingSession(): Unit = {}
   def onAuthFailure(): Unit = {}
-  def onReconnectionTimeout(): Unit = {}
 }
 
 trait ZNodeChangeHandler {
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index daeb82a..cac102c 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -255,7 +255,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   def testDescribeLogDirs(): Unit = {
     client = AdminClient.create(createConfig())
     val topic = "topic"
-    val leaderByPartition = TestUtils.createTopic(zkClient, topic, 10, 1, servers, new Properties())
+    val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1)
     val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq)
     val brokers = (0 until serverCount).map(Integer.valueOf)
     val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get
@@ -281,8 +281,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   def testDescribeReplicaLogDirs(): Unit = {
     client = AdminClient.create(createConfig())
     val topic = "topic"
-    val leaderByPartition = TestUtils.createTopic(zkClient, topic, 10, 1, servers, new Properties())
-    val replicas = leaderByPartition.map { case (partition, brokerId) => new TopicPartitionReplica(topic, partition, brokerId) }.toSeq
+    val leaderByPartition = createTopic(topic, numPartitions = 10, replicationFactor = 1)
+    val replicas = leaderByPartition.map { case (partition, brokerId) =>
+      new TopicPartitionReplica(topic, partition, brokerId)
+    }.toSeq
 
     val replicaDirInfos = client.describeReplicaLogDirs(replicas.asJavaCollection).all.get
     replicaDirInfos.asScala.foreach { case (topicPartitionReplica, replicaDirInfo) =>
@@ -317,7 +319,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
       assertTrue(exception.getCause.isInstanceOf[ReplicaNotAvailableException])
     }
 
-    TestUtils.createTopic(zkClient, topic, 1, serverCount, servers, new Properties)
+    createTopic(topic, numPartitions = 1, replicationFactor = serverCount)
     servers.foreach { server =>
       val logDir = server.logManager.getLog(tp).get.dir.getParent
       assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
@@ -389,11 +391,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     val topicConfig1 = new Properties
     topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
     topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
-    TestUtils.createTopic(zkClient, topic1, 1, 1, servers, topicConfig1)
+    createTopic(topic1, numPartitions = 1, replicationFactor = 1, topicConfig1)
 
     val topic2 = "describe-alter-configs-topic-2"
     val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
-    TestUtils.createTopic(zkClient, topic2, 1, 1, servers, new Properties)
+    createTopic(topic2, numPartitions = 1, replicationFactor = 1)
 
     // Describe topics and broker
     val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString)
@@ -447,7 +449,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     assertEquals(servers(2).config.logCleanerThreads.toString,
       configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
 
-    checkValidAlterConfigs(servers, client, topicResource1, topicResource2)
+    checkValidAlterConfigs(client, topicResource1, topicResource2)
   }
 
   @Test
@@ -456,10 +458,10 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
     // Create topics
     val topic1 = "create-partitions-topic-1"
-    TestUtils.createTopic(zkClient, topic1, 1, 1, servers, new Properties)
+    createTopic(topic1, numPartitions = 1, replicationFactor = 1)
 
     val topic2 = "create-partitions-topic-2"
-    TestUtils.createTopic(zkClient, topic2, 1, 2, servers, new Properties)
+    createTopic(topic2, numPartitions = 1, replicationFactor = 2)
 
     // assert that both the topics have 1 partition
     assertEquals(1, client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size)
@@ -714,7 +716,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testSeekAfterDeleteRecords(): Unit = {
-    TestUtils.createTopic(zkClient, topic, 2, serverCount, servers)
+    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
 
     client = AdminClient.create(createConfig)
 
@@ -741,7 +743,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
   @Test
   @Ignore // Disabled temporarily until flakiness is resolved
   def testLogStartOffsetCheckpoint(): Unit = {
-    TestUtils.createTopic(zkClient, topic, 2, serverCount, servers)
+    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
 
     client = AdminClient.create(createConfig)
 
@@ -780,7 +782,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testLogStartOffsetAfterDeleteRecords(): Unit = {
-    TestUtils.createTopic(zkClient, topic, 2, serverCount, servers)
+    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
 
     client = AdminClient.create(createConfig)
 
@@ -799,7 +801,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testOffsetsForTimesAfterDeleteRecords(): Unit = {
-    TestUtils.createTopic(zkClient, topic, 2, serverCount, servers)
+    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
 
     client = AdminClient.create(createConfig)
 
@@ -942,8 +944,7 @@ object AdminClientIntegrationTest {
 
   import org.scalatest.Assertions._
 
-  def checkValidAlterConfigs(servers: Seq[KafkaServer], client: AdminClient,
-                             topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
+  def checkValidAlterConfigs(client: AdminClient, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = {
     // Alter topics
     var topicConfigEntries1 = Seq(
       new ConfigEntry(LogConfig.FlushMsProp, "1000")
@@ -1009,11 +1010,11 @@ object AdminClientIntegrationTest {
     // Create topics
     val topic1 = "invalid-alter-configs-topic-1"
     val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
-    TestUtils.createTopic(zkClient, topic1, 1, 1, servers, new Properties())
+    TestUtils.createTopic(zkClient, topic1, 1, 1, servers)
 
     val topic2 = "invalid-alter-configs-topic-2"
     val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
-    TestUtils.createTopic(zkClient, topic2, 1, 1, servers, new Properties)
+    TestUtils.createTopic(zkClient, topic2, 1, 1, servers)
 
     val topicConfigEntries1 = Seq(
       new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index a381c84..1bea039 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -76,13 +76,13 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
     val topicConfig1 = new Properties
     topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000")
     topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000")
-    TestUtils.createTopic(zkClient, topic1, 1, 1, servers, topicConfig1)
+    createTopic(topic1, 1, 1, topicConfig1)
 
     val topic2 = "describe-alter-configs-topic-2"
     val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
-    TestUtils.createTopic(zkClient, topic2, 1, 1, servers, new Properties)
+    createTopic(topic2, 1, 1)
 
-    AdminClientIntegrationTest.checkValidAlterConfigs(servers, client, topicResource1, topicResource2)
+    AdminClientIntegrationTest.checkValidAlterConfigs(client, topicResource1, topicResource2)
   }
 
   @Test
@@ -98,15 +98,15 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
     // Create topics
     val topic1 = "invalid-alter-configs-due-to-policy-topic-1"
     val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
-    TestUtils.createTopic(zkClient, topic1, 1, 1, servers, new Properties())
+    createTopic(topic1, 1, 1)
 
     val topic2 = "invalid-alter-configs-due-to-policy-topic-2"
     val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
-    TestUtils.createTopic(zkClient, topic2, 1, 1, servers, new Properties)
+    createTopic(topic2, 1, 1)
 
     val topic3 = "invalid-alter-configs-due-to-policy-topic-3"
     val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3)
-    TestUtils.createTopic(zkClient, topic3, 1, 1, servers, new Properties)
+    createTopic(topic3, 1, 1)
 
     val topicConfigEntries1 = Seq(
       new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"),
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index a11dd8c..3521bb6 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,7 +19,7 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
-import kafka.utils.{ShutdownableThread, TestUtils}
+import kafka.utils.ShutdownableThread
 import kafka.server.KafkaConfig
 import org.junit.Assert._
 import org.junit.{Before, Test}
@@ -68,7 +68,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     super.setUp()
 
     // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(zkClient, topic, 2, serverCount, this.servers)
+    createTopic(topic, 2, serverCount)
   }
 
   @Test
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 579c8bb..106984c 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -123,7 +123,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
     try {
       // create topic
-      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+      createTopic(topic, 1, 2)
 
       // send a normal record
       val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8),
@@ -184,7 +184,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
                               timeoutMs: Long = 20000L) {
     val partition = 0
     try {
-      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+      createTopic(topic, 1, 2)
 
       val futures = for (i <- 1 to numRecords) yield {
         val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -239,7 +239,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
       else
         topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "CreateTime")
-      TestUtils.createTopic(zkClient, topic, 1, 2, servers, topicProps)
+      createTopic(topic, 1, 2, topicProps)
 
       val recordAndFutures = for (i <- 1 to numRecords) yield {
         val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8),
@@ -271,7 +271,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
     try {
       // create topic
-      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+      createTopic(topic, 1, 2)
 
       // non-blocking send a list of records
       val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8),
@@ -303,7 +303,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val producer = createProducer(brokerList)
 
     try {
-      TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      createTopic(topic, 2, 2)
       val partition = 1
 
       val now = System.currentTimeMillis()
@@ -348,7 +348,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val producer = createProducer(brokerList)
 
     // create topic
-    TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+    createTopic(topic, 1, 2)
     val partition0 = 0
 
     var futures0 = (1 to numRecords).map { i =>
@@ -410,7 +410,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   def testFlush() {
     val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
     try {
-      TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      createTopic(topic, 2, 2)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
         "value".getBytes(StandardCharsets.UTF_8))
       for (_ <- 0 until 50) {
@@ -429,7 +429,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    */
   @Test
   def testCloseWithZeroTimeoutFromCallerThread() {
-    TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+    createTopic(topic, 2, 2)
     val partition = 0
     consumer.assign(List(new TopicPartition(topic, partition)).asJava)
     val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null,
@@ -459,7 +459,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    */
   @Test
   def testCloseWithZeroTimeoutFromSenderThread() {
-    TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+    createTopic(topic, 1, 2)
     val partition = 0
     consumer.assign(List(new TopicPartition(topic, partition)).asJava)
     val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8))
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 257308c..9b1c2aa 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -72,7 +72,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     super.setUp()
 
     val numPartitions = 1
-    val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, serverCount, servers)
+    val leaders = createTopic(topic1, numPartitions, serverCount)
     leaderNode = if (leaders(0) == servers.head.config.brokerId) servers.head else servers(1)
     followerNode = if (leaders(0) != servers.head.config.brokerId) servers.head else servers(1)
   }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 5fca9b4..8917921 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -69,7 +69,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     super.setUp()
 
     // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(zkClient, topic, 1, serverCount, this.servers)
+    createTopic(topic, 1, serverCount)
   }
 
   @After
@@ -288,7 +288,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   @Test
   def testCloseDuringRebalance() {
     val topic = "closetest"
-    TestUtils.createTopic(zkClient, topic, 10, serverCount, this.servers)
+    createTopic(topic, 10, serverCount)
     this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000")
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index d997db5..1e93b37 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -161,7 +161,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*"))
     }
     // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(zkClient, topic, 1, 3, this.servers)
+    createTopic(topic, 1, 3)
   }
 
   override def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 8022170..eff596b 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -115,7 +115,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
     super.setUp()
     MockDeserializer.resetStaticVariables
     // create the consumer offset topic
-    TestUtils.createTopic(zkClient, topic, 2, serverCount, this.servers)
+    createTopic(topic, 2, serverCount)
   }
 
   @Test
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index 774db18..57a2b20 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -69,7 +69,7 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
   override def setUp() {
     super.setUp()
     client = AdminClient.createSimplePlaintext(this.brokerList)
-    TestUtils.createTopic(zkClient, topic, 2, serverCount, this.servers)
+    createTopic(topic, 2, serverCount)
   }
 
   @After
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index 9c5a9c8..eaa4a23 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -48,7 +48,7 @@ class LogAppendTimeTest extends IntegrationTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    TestUtils.createTopic(zkClient, topic, servers = servers)
+    createTopic(topic)
   }
 
   @Test
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 6e95973..80cfeca 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -71,7 +71,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     val topic = "topicWithOldMessageFormat"
     val props = new Properties
     props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
-    TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, this.servers, props)
+    createTopic(topic, numPartitions = 1, replicationFactor = 1, props)
     val tp = new TopicPartition(topic, 0)
 
     // Produce and consume some records
@@ -206,9 +206,9 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
 
   private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit = {
     // Latency is rounded to milliseconds, so check the count instead.
-    val initialCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+    val initialCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
     servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
-    val newCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long]
+    val newCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
     assertTrue("ZooKeeper latency not recorded",  newCount > initialCount)
 
     assertEquals(s"Unexpected ZK state", "CONNECTED", yammerMetricValue("SessionState"))
@@ -274,7 +274,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     }
   }
 
-  private def yammerHistogramCount(name: String): Any = {
+  private def yammerHistogramCount(name: String): Long = {
     val allMetrics = Metrics.defaultRegistry.allMetrics.asScala
     val (_, metric) = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) }
       .getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}"))
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 65b2865..04935f8 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -325,17 +325,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     sendRecords(numRecords)
 
     val topic1 = "tblablac" // matches subscribed pattern
-    TestUtils.createTopic(zkClient, topic1, 2, serverCount, this.servers)
+    createTopic(topic1, 2, serverCount)
     sendRecords(1000, new TopicPartition(topic1, 0))
     sendRecords(1000, new TopicPartition(topic1, 1))
 
     val topic2 = "tblablak" // does not match subscribed pattern
-    TestUtils.createTopic(zkClient, topic2, 2, serverCount, this.servers)
+    createTopic(topic2, 2, serverCount)
     sendRecords(1000, new TopicPartition(topic2, 0))
     sendRecords(1000, new TopicPartition(topic2, 1))
 
     val topic3 = "tblab1" // does not match subscribed pattern
-    TestUtils.createTopic(zkClient, topic3, 2, serverCount, this.servers)
+    createTopic(topic3, 2, serverCount)
     sendRecords(1000, new TopicPartition(topic3, 0))
     sendRecords(1000, new TopicPartition(topic3, 1))
 
@@ -357,7 +357,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment()}")
 
     val topic4 = "tsomec" // matches subscribed pattern
-    TestUtils.createTopic(zkClient, topic4, 2, serverCount, this.servers)
+    createTopic(topic4, 2, serverCount)
     sendRecords(1000, new TopicPartition(topic4, 0))
     sendRecords(1000, new TopicPartition(topic4, 1))
 
@@ -396,7 +396,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // the first topic ('topic')  matches first subscription pattern only
 
     val fooTopic = "foo" // matches both subscription patterns
-    TestUtils.createTopic(zkClient, fooTopic, 1, serverCount, this.servers)
+    createTopic(fooTopic, 1, serverCount)
     sendRecords(1000, new TopicPartition(fooTopic, 0))
 
     assertEquals(0, consumer0.assignment().size)
@@ -416,7 +416,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     }, s"Expected partitions ${subscriptions.asJava} but actually got ${consumer0.assignment()}")
 
     val barTopic = "bar" // matches the next subscription pattern
-    TestUtils.createTopic(zkClient, barTopic, 1, serverCount, this.servers)
+    createTopic(barTopic, 1, serverCount)
     sendRecords(1000, new TopicPartition(barTopic, 0))
 
     val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this
@@ -453,7 +453,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     sendRecords(numRecords)
 
     val topic1 = "tblablac" // matches the subscription pattern
-    TestUtils.createTopic(zkClient, topic1, 2, serverCount, this.servers)
+    createTopic(topic1, 2, serverCount)
     sendRecords(1000, new TopicPartition(topic1, 0))
     sendRecords(1000, new TopicPartition(topic1, 1))
 
@@ -525,7 +525,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       this.consumers.head.assignment == subscriptions.asJava
     }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers.head.assignment}")
 
-    TestUtils.createTopic(zkClient, otherTopic, 2, serverCount, this.servers)
+    createTopic(otherTopic, 2, serverCount)
     this.consumers.head.subscribe(List(topic, otherTopic).asJava)
     TestUtils.waitUntilTrue(() => {
       this.consumers.head.poll(50)
@@ -536,7 +536,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testShrinkingTopicSubscriptions() {
     val otherTopic = "other"
-    TestUtils.createTopic(zkClient, otherTopic, 2, serverCount, this.servers)
+    createTopic(otherTopic, 2, serverCount)
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
     val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
     this.consumers.head.subscribe(List(topic, otherTopic).asJava)
@@ -555,7 +555,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testPartitionsFor() {
     val numParts = 2
-    TestUtils.createTopic(zkClient, "part-test", numParts, 1, this.servers)
+    createTopic("part-test", numParts, 1)
     val parts = this.consumers.head.partitionsFor("part-test")
     assertNotNull(parts)
     assertEquals(2, parts.size)
@@ -788,7 +788,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val partitionCount = 30
     val topics = Seq(topic1, topic2, topic3)
     topics.foreach { topicName =>
-      TestUtils.createTopic(zkClient, topicName, partitionCount, serverCount, servers)
+      createTopic(topicName, partitionCount, serverCount)
     }
 
     val partitions = topics.flatMap { topic =>
@@ -1049,7 +1049,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testAutoCommitIntercept() {
     val topic2 = "topic2"
-    TestUtils.createTopic(zkClient, topic2, 2, serverCount, this.servers)
+    createTopic(topic2, 2, serverCount)
 
     // produce records
     val numRecords = 100
@@ -1145,7 +1145,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val topicName = "testConsumeMessagesWithLogAppendTime"
     val topicProps = new Properties()
     topicProps.setProperty(LogConfig.MessageTimestampTypeProp, "LogAppendTime")
-    TestUtils.createTopic(zkClient, topicName, 2, 2, servers, topicProps)
+    createTopic(topicName, 2, 2, topicProps)
 
     val startTime = System.currentTimeMillis()
     val numRecords = 50
@@ -1171,9 +1171,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val topic1 = "part-test-topic-1"
     val topic2 = "part-test-topic-2"
     val topic3 = "part-test-topic-3"
-    TestUtils.createTopic(zkClient, topic1, numParts, 1, this.servers)
-    TestUtils.createTopic(zkClient, topic2, numParts, 1, this.servers)
-    TestUtils.createTopic(zkClient, topic3, numParts, 1, this.servers)
+    createTopic(topic1, numParts, 1)
+    createTopic(topic2, numParts, 1)
+    createTopic(topic3, numParts, 1)
 
     val topics = this.consumers.head.listTopics()
     assertNotNull(topics)
@@ -1192,10 +1192,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val topic3 = "part-test-topic-3"
     val props = new Properties()
     props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
-    TestUtils.createTopic(zkClient, topic1, numParts, 1, this.servers)
+    createTopic(topic1, numParts, 1)
     // Topic2 is in old message format.
-    TestUtils.createTopic(zkClient, topic2, numParts, 1, this.servers, props)
-    TestUtils.createTopic(zkClient, topic3, numParts, 1, this.servers)
+    createTopic(topic2, numParts, 1, props)
+    createTopic(topic3, numParts, 1)
 
     val consumer = this.consumers.head
 
@@ -1242,7 +1242,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     createTopicAndSendRecords(topicName = topic0, numPartitions = 2, recordsPerPartition = 100)
     val props = new Properties()
     props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
-    TestUtils.createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, this.servers, props)
+    createTopic(topic1, numPartitions = 1, replicationFactor = 1, props)
     sendRecords(100, new TopicPartition(topic1, 0))
 
     val t0p0 = new TopicPartition(topic0, 0)
@@ -1332,7 +1332,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testAutoCommitOnRebalance() {
     val topic2 = "topic2"
-    TestUtils.createTopic(zkClient, topic2, 2, serverCount, this.servers)
+    createTopic(topic2, 2, serverCount)
 
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
@@ -1380,7 +1380,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testPerPartitionLagMetricsCleanUpWithSubscribe() {
     val numMessages = 1000
     val topic2 = "topic2"
-    TestUtils.createTopic(zkClient, topic2, 2, serverCount, this.servers)
+    createTopic(topic2, 2, serverCount)
     // send some messages.
     sendRecords(numMessages, tp)
     // Test subscribe
@@ -1551,7 +1551,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
    * records to each partition
    */
   def createTopicAndSendRecords(topicName: String, numPartitions: Int, recordsPerPartition: Int): Set[TopicPartition] = {
-    TestUtils.createTopic(zkClient, topicName, numPartitions, serverCount, this.servers)
+    createTopic(topicName, numPartitions, serverCount)
     var parts = Set[TopicPartition]()
     for (partition <- 0 until numPartitions) {
       val tp = new TopicPartition(topicName, partition)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 6425c1e..929dbe4 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -89,7 +89,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
   def testSendWithInvalidCreateTime() {
     val topicProps = new Properties()
     topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
-    TestUtils.createTopic(zkClient, topic, 1, 2, servers, topicProps)
+    createTopic(topic, 1, 2, topicProps)
 
     val producer = createProducer(brokerList = brokerList)
     try {
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 92ccf49..e3514cd 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -68,10 +68,10 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     val numPartitions = 3
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
-    TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers, topicConfig)
+    createTopic(topic1, numPartitions, numServers, topicConfig)
 
     val scheduler = new ProducerScheduler()
-    scheduler.start
+    scheduler.start()
 
     // rolling bounce brokers
 
@@ -92,7 +92,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
       (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition))
     }
 
-    scheduler.shutdown
+    scheduler.shutdown()
 
     // Make sure the producer do not see any exception
     // when draining the left messages on shutdown
diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index ea4b05c..4bce8e3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -62,7 +62,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both,
       JaasTestUtils.KafkaServerContextName))
     super.setUp()
-    TestUtils.createTopic(zkClient, topic, numPartitions, serverCount, this.servers)
+    createTopic(topic, numPartitions, serverCount)
   }
 
   @After
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index da6ca1f..110f609 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -169,8 +169,8 @@ class TransactionsBounceTest extends KafkaServerTestHarness {
   private def createTopics() =  {
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
-    TestUtils.createTopic(zkClient, inputTopic, numPartitions, 3, servers, topicConfig)
-    TestUtils.createTopic(zkClient, outputTopic, numPartitions, 3, servers, topicConfig)
+    createTopic(inputTopic, numPartitions, 3, topicConfig)
+    createTopic(outputTopic, numPartitions, 3, topicConfig)
   }
 
   private class BounceScheduler extends ShutdownableThread("daemon-broker-bouncer", false) {
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index d50dd33..911808a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -60,8 +60,8 @@ class TransactionsTest extends KafkaServerTestHarness {
     val numPartitions = 4
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
-    TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers, topicConfig)
-    TestUtils.createTopic(zkClient, topic2, numPartitions, numServers, servers, topicConfig)
+    createTopic(topic1, numPartitions, numServers, topicConfig)
+    createTopic(topic2, numPartitions, numServers, topicConfig)
 
     for (_ <- 0 until transactionalProducerCount)
       createTransactionalProducer("transactional-producer")
@@ -503,8 +503,8 @@ class TransactionsTest extends KafkaServerTestHarness {
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
 
-    TestUtils.createTopic(zkClient, topicWith10Partitions, 10, numServers, servers, topicConfig)
-    TestUtils.createTopic(zkClient, topicWith10PartitionsAndOneReplica, 10, 1, servers, new Properties())
+    createTopic(topicWith10Partitions, 10, numServers, topicConfig)
+    createTopic(topicWith10PartitionsAndOneReplica, 10, 1, new Properties())
 
     firstProducer.initTransactions()
 
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 1f07ce4..3b5f888 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -142,7 +142,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => p -> zkUtils.getReplicasForPartition(topic, p)).toMap
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
     for(i <- 0 until actualReplicaList.size)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index 78bbe09..8a731cf 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -36,7 +36,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val groupToDelete = "groupToDelete"
     val otherGroup = "otherGroup"
 
-    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    createTopic(topic, 1, 3)
     fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
     fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
 
@@ -54,7 +54,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val groupToDelete = "groupToDelete"
     val otherGroup = "otherGroup"
 
-    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    createTopic(topic, 1, 3)
     fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true)
     fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
 
@@ -71,7 +71,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val topic = "test"
     val groupToDelete = "groupToDelete"
     val otherGroup = "otherGroup"
-    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    createTopic(topic, 1, 3)
     fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false)
     fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false)
 
@@ -89,8 +89,8 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val otherTopic = "otherTopic"
     val groupToDelete = "groupToDelete"
     val otherGroup = "otherGroup"
-    TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
-    TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+    createTopic(topicToDelete, 1, 3)
+    createTopic(otherTopic, 1, 3)
 
     fillInConsumerGroupInfo(topicToDelete, groupToDelete, "consumer", 0, 10, false)
     fillInConsumerGroupInfo(otherTopic, groupToDelete, "consumer", 0, 10, false)
@@ -111,8 +111,8 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val topicToDelete = "topicToDelete"
     val otherTopic = "otherTopic"
     val group = "group"
-    TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
-    TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+    createTopic(topicToDelete, 1, 3)
+    createTopic(otherTopic, 1, 3)
 
     fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true)
     fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true)
@@ -131,8 +131,8 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val otherTopic = "otherTopic"
     val groups = Seq("group1", "group2")
 
-    TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers)
-    TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers)
+    createTopic(topicToDelete, 1, 3)
+    createTopic(otherTopic, 1, 3)
     val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete))
     val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic))
     groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false))
@@ -151,7 +151,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     val topic = "topic"
     val group = "group"
 
-    TestUtils.createTopic(zkClient, topic, 1, 3, servers)
+    createTopic(topic, 1, 3)
     val dir = new ZKGroupTopicDirs(group, topic)
     fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false)
 
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 593b9cc..897cc59 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -24,7 +24,7 @@ import org.junit.Assert._
 import org.junit.{After, Test}
 import java.util.Properties
 
-import kafka.common.{KafkaException, TopicAlreadyMarkedForDeletionException}
+import kafka.common.TopicAlreadyMarkedForDeletionException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 6325060..43ec6c6 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -439,7 +439,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     for (describeType <- describeTypes) {
       val group = this.group + describeType.mkString("")
       // run one consumer in the group consuming from a single-partition topic
-      val executor = addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic))
+      addConsumerGroupExecutor(new ConsumerGroupExecutor(brokerList, 2, group, topic))
       val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) ++ describeType
       val service = getConsumerGroupService(cgcArgs)
 
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 7347fa3..4ab2563 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -32,7 +32,7 @@ import org.junit.{Before, Test}
 import org.junit.Assert.{assertEquals, assertNull}
 
 import scala.collection.{Seq, mutable}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.kafka.common.TopicPartition
 
 class ReassignPartitionsCommandTest  extends ZooKeeperTestHarness  with Logging {
@@ -270,7 +270,7 @@ class ReassignPartitionsCommandTest  extends ZooKeeperTestHarness  with Logging
     assigner.maybeLimit(Throttle(1000))
 
     //Then
-    for (actual <- propsCapture.getValues) {
+    for (actual <- propsCapture.getValues.asScala) {
       assertEquals("1000", actual.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
       assertEquals("1000", actual.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
     }
@@ -304,7 +304,7 @@ class ReassignPartitionsCommandTest  extends ZooKeeperTestHarness  with Logging
     assigner.maybeLimit(Throttle(1000))
 
     //Then
-    for (actual <- propsCapture.getValues) {
+    for (actual <- propsCapture.getValues.asScala) {
       assertEquals("1000", actual.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
       assertEquals("1000", actual.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
     }
@@ -334,7 +334,7 @@ class ReassignPartitionsCommandTest  extends ZooKeeperTestHarness  with Logging
     assigner.maybeLimit(Throttle(1000))
 
     //Then other property remains
-    for (actual <- propsCapture.getValues) {
+    for (actual <- propsCapture.getValues.asScala) {
       assertEquals("useful.value", actual.getProperty("useful.key"))
       assertEquals("1000", actual.getProperty(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
       assertEquals("1000", actual.getProperty(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
@@ -369,7 +369,7 @@ class ReassignPartitionsCommandTest  extends ZooKeeperTestHarness  with Logging
     ReassignPartitionsCommand.removeThrottle(zk, status, Map.empty, admin)
 
     //Then props should have gone (dummy remains)
-    for (capture <- propsCapture.getValues) {
+    for (capture <- propsCapture.getValues.asScala) {
       assertEquals("value", capture.get("useful.key"))
       assertNull(capture.get(DynamicConfig.Broker.FollowerReplicationThrottledRateProp))
       assertNull(capture.get(DynamicConfig.Broker.LeaderReplicationThrottledRateProp))
@@ -406,7 +406,7 @@ class ReassignPartitionsCommandTest  extends ZooKeeperTestHarness  with Logging
     ReassignPartitionsCommand.removeThrottle(zk, status, Map.empty, admin)
 
     //Then props should have gone (dummy remains)
-    for (actual <- propsCapture.getValues) {
+    for (actual <- propsCapture.getValues.asScala) {
       assertEquals("value", actual.getProperty("useful.key"))
       assertNull(actual.getProperty(LogConfig.LeaderReplicationThrottledReplicasProp))
       assertNull(actual.getProperty(LogConfig.FollowerReplicationThrottledReplicasProp))
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index a563c03..c60a7ed 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -17,7 +17,10 @@
 
 package kafka.cluster
 
+import java.nio.charset.StandardCharsets
+
 import kafka.utils.TestUtils
+import kafka.zk.BrokerIdZNode
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.Assert.{assertEquals, assertNotEquals, assertNull}
@@ -44,7 +47,7 @@ class BrokerEndPointTest {
 
   @Test
   def testFromJsonFutureVersion(): Unit = {
-    // `createBroker` should support future compatible versions, we use a hypothetical future version here
+    // Future compatible versions should be supported, we use a hypothetical future version here
     val brokerInfoStr = """{
       "foo":"bar",
       "version":100,
@@ -54,7 +57,7 @@ class BrokerEndPointTest {
       "timestamp":"1416974968782",
       "endpoints":["SSL://localhost:9093"]
     }"""
-    val broker = Broker.createBroker(1, brokerInfoStr)
+    val broker = parseBrokerJson(1, brokerInfoStr)
     assertEquals(1, broker.id)
     val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
     assertEquals("localhost", brokerEndPoint.host)
@@ -71,7 +74,7 @@ class BrokerEndPointTest {
       "timestamp":"1416974968782",
       "endpoints":["PLAINTEXT://localhost:9092"]
     }"""
-    val broker = Broker.createBroker(1, brokerInfoStr)
+    val broker = parseBrokerJson(1, brokerInfoStr)
     assertEquals(1, broker.id)
     val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
     assertEquals("localhost", brokerEndPoint.host)
@@ -81,7 +84,7 @@ class BrokerEndPointTest {
   @Test
   def testFromJsonV1(): Unit = {
     val brokerInfoStr = """{"jmx_port":-1,"timestamp":"1420485325400","host":"172.16.8.243","version":1,"port":9091}"""
-    val broker = Broker.createBroker(1, brokerInfoStr)
+    val broker = parseBrokerJson(1, brokerInfoStr)
     assertEquals(1, broker.id)
     val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
     assertEquals("172.16.8.243", brokerEndPoint.host)
@@ -99,7 +102,7 @@ class BrokerEndPointTest {
       "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
       "rack":"dc1"
     }"""
-    val broker = Broker.createBroker(1, json)
+    val broker = parseBrokerJson(1, json)
     assertEquals(1, broker.id)
     val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
     assertEquals("host1", brokerEndPoint.host)
@@ -119,7 +122,7 @@ class BrokerEndPointTest {
       "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
       "rack":null
     }"""
-    val broker = Broker.createBroker(1, json)
+    val broker = parseBrokerJson(1, json)
     assertEquals(1, broker.id)
     val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
     assertEquals("host1", brokerEndPoint.host)
@@ -138,7 +141,7 @@ class BrokerEndPointTest {
       "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
       "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}
     }"""
-    val broker = Broker.createBroker(1, json)
+    val broker = parseBrokerJson(1, json)
     assertEquals(1, broker.id)
     val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
     assertEquals("host1", brokerEndPoint.host)
@@ -212,4 +215,7 @@ class BrokerEndPointTest {
     assertEquals(9092, endpoint.port)
     assertEquals("PLAINTEXT://MyHostname:9092", endpoint.connectionString)
   }
+
+  private def parseBrokerJson(id: Int, jsonString: String): Broker =
+    BrokerIdZNode.decode(id, jsonString.getBytes(StandardCharsets.UTF_8)).broker
 }
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index f46d8cf..c495764 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -27,7 +27,6 @@ import scala.collection._
 import org.junit.Assert._
 import kafka.message._
 import kafka.server._
-import kafka.utils.TestUtils._
 import kafka.utils._
 import org.junit.{Before, Test}
 import kafka.serializer._
@@ -60,7 +59,7 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
       new AtomicLong(0),
       new AtomicInteger(0),
       ""))
-    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
+    createTopic(topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 729ab1a..77930e6 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -324,7 +324,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     val zkUtils = ZkUtils(zkConnect, 6000, 30000, false)
 
     // create topic topic1 with 1 partition on broker 0
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    createTopic(topic, numPartitions = 1, replicationFactor = 1)
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(servers, topic, nMessages)
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 32435c6..0ad64c5 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -87,7 +87,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testTopicCreationWithOfflineReplica(): Unit = {
     servers = makeServers(2)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
@@ -115,7 +115,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testTopicPartitionExpansionWithOfflineReplica(): Unit = {
     servers = makeServers(2)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp0 = TopicAndPartition("t", 0)
     val tp1 = TopicAndPartition("t", 1)
@@ -133,7 +133,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testPartitionReassignment(): Unit = {
     servers = makeServers(2)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
 
     val metricName = s"kafka.controller:type=ControllerStats,name=${ControllerState.PartitionReassignment.rateAndTimeMetricName.get}"
     val timerCount = timer(metricName).count
@@ -158,7 +158,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testPartitionReassignmentWithOfflineReplicaHaltingProgress(): Unit = {
     servers = makeServers(2)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp = TopicAndPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(controllerId))
@@ -176,7 +176,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testPartitionReassignmentResumesAfterReplicaComesOnline(): Unit = {
     servers = makeServers(2)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp = TopicAndPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(controllerId))
@@ -199,7 +199,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testPreferredReplicaLeaderElection(): Unit = {
     servers = makeServers(2)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBroker = servers.find(_.config.brokerId != controllerId).get
     val tp = TopicAndPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
@@ -210,7 +210,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testBackToBackPreferredReplicaLeaderElections(): Unit = {
     servers = makeServers(2)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBroker = servers.find(_.config.brokerId != controllerId).get
     val tp = TopicAndPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
@@ -222,7 +222,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testPreferredReplicaLeaderElectionWithOfflinePreferredReplica(): Unit = {
     servers = makeServers(2)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp = TopicAndPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
@@ -239,7 +239,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testAutoPreferredReplicaLeaderElection(): Unit = {
     servers = makeServers(2, autoLeaderRebalanceEnable = true)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp = TopicAndPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(1, 0))
@@ -256,7 +256,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled(): Unit = {
     servers = makeServers(2)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp = TopicAndPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId))
@@ -276,7 +276,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled(): Unit = {
     servers = makeServers(2, uncleanLeaderElectionEnable = true)
-    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     val tp = TopicAndPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId))
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index bec5282..a49523f 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -77,7 +77,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
    * Returns the count of messages received.
    */
   def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
-    TestUtils.createTopic(zkClient, topic, 1, 1, servers)
+    createTopic(topic, 1, 1)
 
     val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(
       TestUtils.getBrokerListStrFromServers(servers),
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 6ecec3c..617b326 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -43,7 +43,7 @@ class FetcherTest extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp
-    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers)
+    createTopic(topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)))
 
     val cluster = new Cluster(servers.map { s =>
       new Broker(s.config.brokerId, "localhost", boundPort(s), listenerName, securityProtocol)
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index fed78a5..9c09a43 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -105,6 +105,23 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
   }
 
   /**
+   * Create a topic in ZooKeeper.
+   * Wait until the leader is elected and the metadata is propagated to all brokers.
+   * Return the leader for each partition.
+   */
+  def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1,
+                  topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] =
+    TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers, topicConfig)
+
+  /**
+   * Create a topic in ZooKeeper using a customized replica assignment.
+   * Wait until the leader is elected and the metadata is propagated to all brokers.
+   * Return the leader for each partition.
+   */
+  def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
+    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, servers)
+
+  /**
    * Pick a broker at random and kill it if it isn't already dead
    * Return the id of the broker killed
    */
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index d6c59d2..0cf95e9 100755
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -117,7 +117,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
 
   private def produceAndMultiFetch(producer: Producer[String, String]) {
     for(topic <- List("test1", "test2", "test3", "test4"))
-      TestUtils.createTopic(zkClient, topic, servers = servers)
+      createTopic(topic)
 
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@@ -186,7 +186,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
 
   private def multiProduce(producer: Producer[String, String]) {
     val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
-    topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
+    topics.keys.map(topic => createTopic(topic))
 
     val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
@@ -214,7 +214,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
   @Test
   def testConsumerEmptyTopic() {
     val newTopic = "new-topic"
-    TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    createTopic(newTopic, numPartitions = 1, replicationFactor = 1)
 
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
@@ -223,7 +223,7 @@ class PrimitiveApiTest extends ProducerConsumerTestHarness {
   @Test
   def testPipelinedProduceRequests() {
     val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
-    topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
+    topics.keys.map(topic => createTopic(topic))
     val props = new Properties()
     props.put("request.required.acks", "0")
     val pipelinedProducer: Producer[String, String] =
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 215776f..dbd9118 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -57,7 +57,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // create the topic
-    TestUtils.createTopic(zkClient, topic, numParts, 1, servers)
+    createTopic(topic, numParts, 1)
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(servers, nMessages, "batch1")
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index b8794f3..4227764 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -54,7 +54,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   def testMetricsLeak() {
     val topic = "test-metrics-leak"
     // create topic topic1 with 1 partition on broker 0
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    createTopic(topic, numPartitions = 1, replicationFactor = 1)
     // force creation not client's specific metrics.
     createAndShutdownStep(topic, "group0", "consumer0", "producer0")
 
@@ -130,7 +130,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
 
     val topicConfig = new Properties
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, "2")
-    createTopic(zkClient, topic, 1, numNodes, servers, topicConfig)
+    createTopic(topic, 1, numNodes, topicConfig)
     // Produce a few messages to create the metrics
     TestUtils.produceMessages(servers, topic, nMessages)
 
@@ -176,6 +176,19 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1)
   }
 
+  /**
+   * Test that the metrics are created with the right name, testZooKeeperStateChangeRateMetrics
+   * and testZooKeeperSessionStateMetric in ZooKeeperClientTest test the metrics behaviour.
+   */
+  @Test
+  def testSessionExpireListenerMetrics(): Unit = {
+    val metrics = Metrics.defaultRegistry.allMetrics
+
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=SessionState"), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperExpiresPerSec"), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=SessionExpireListener,name=ZooKeeperDisconnectsPerSec"), 1)
+  }
+
   private def meterCount(metricName: String): Long = {
     Metrics.defaultRegistry.allMetrics.asScala
       .filterKeys(_.getMBeanName.endsWith(metricName))
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 9279d90..5c1d4da 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -96,7 +96,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val props = TestUtils.getSyncProducerConfig(boundPort(server))
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers)
+    createTopic("test", numPartitions = 1, replicationFactor = 1)
 
     val message1 = new Message(new Array[Byte](configs.head.messageMaxBytes + 1))
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 646143c..033ca67 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -79,12 +79,16 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
     assertTrue(zkUtils.isSecure)
     for (path <- zkUtils.persistentZkPaths) {
       zkUtils.makeSurePersistentPathExists(path)
-      if(!path.equals(ZkUtils.ConsumersPath)) {
+      if (ZkUtils.sensitivePath(path)) {
         val aclList = zkUtils.zkConnection.getAcl(path).getKey
-        assertTrue(aclList.size == 2)
-        for (acl: ACL <- aclList.asScala) {
-          assertTrue(TestUtils.isAclSecure(acl, false))
-        }
+        assertEquals(s"Unexpected acl list size for $path", 1, aclList.size)
+        for (acl <- aclList.asScala)
+          assertTrue(TestUtils.isAclSecure(acl, sensitive = true))
+      } else if (!path.equals(ZkUtils.ConsumersPath)) {
+        val aclList = zkUtils.zkConnection.getAcl(path).getKey
+        assertEquals(s"Unexpected acl list size for $path", 2, aclList.size)
+        for (acl <- aclList.asScala)
+          assertTrue(TestUtils.isAclSecure(acl, sensitive = false))
       }
     }
     // Test that can create: createEphemeralPathExpectConflict
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
index 9a3187b..4a47400 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 
 import java.util.Properties
 
-import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse}
@@ -38,7 +37,7 @@ class AddPartitionsToTxnRequestTest extends BaseRequestTest {
   @Before
   override def setUp(): Unit = {
     super.setUp()
-    TestUtils.createTopic(zkClient, topic1, numPartitions, servers.size, servers, new Properties())
+    createTopic(topic1, numPartitions, servers.size, new Properties())
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
index fabe3d8..1739d27 100644
--- a/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
@@ -54,7 +54,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
       assertTrue(servers.head.logManager.getLog(tp).isEmpty)
     }
 
-    TestUtils.createTopic(zkClient, topic, partitionNum, 1, servers)
+    createTopic(topic, partitionNum, 1)
     (0 until partitionNum).foreach { partition =>
       assertEquals(logDir1, servers.head.logManager.getLog(new TopicPartition(topic, partition)).get.dir.getParent)
     }
@@ -88,7 +88,7 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest {
     assertEquals(Errors.LOG_DIR_NOT_FOUND, alterReplicaDirResponse1.responses().get(new TopicPartition(topic, 0)))
     assertEquals(Errors.REPLICA_NOT_AVAILABLE, alterReplicaDirResponse1.responses().get(new TopicPartition(topic, 1)))
 
-    TestUtils.createTopic(zkClient, topic, 3, 1, servers)
+    createTopic(topic, 3, 1)
 
     // Test AlterReplicaDirRequest after topic creation
     val partitionDirs2 = mutable.Map.empty[TopicPartition, String]
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index a6381a6..13a2d23 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -59,7 +59,7 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
   def testErrorCreateTopicsRequests() {
     val timeout = 10000
     val existingTopic = "existing-topic"
-    TestUtils.createTopic(zkClient, existingTopic, 1, 1, servers)
+    createTopic(existingTopic, 1, 1)
 
     // Basic
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 3598626..42e9ff8 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -21,7 +21,6 @@ import java.util
 import java.util.Properties
 
 import kafka.log.LogConfig
-import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.PolicyViolationException
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.CreateTopicsRequest
@@ -62,7 +61,7 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
   def testErrorCreateTopicsRequests() {
     val timeout = 10000
     val existingTopic = "existing-topic"
-    TestUtils.createTopic(zkClient, existingTopic, 1, 1, servers)
+    createTopic(existingTopic, 1, 1)
 
     // Policy violations
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 237e918..4388e64 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -32,11 +32,11 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
   def testValidDeleteTopicRequests() {
     val timeout = 10000
     // Single topic
-    TestUtils.createTopic(zkClient, "topic-1", 1, 1, servers)
+    createTopic("topic-1", 1, 1)
     validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-1").asJava, timeout).build())
     // Multi topic
-    TestUtils.createTopic(zkClient, "topic-3", 5, 2, servers)
-    TestUtils.createTopic(zkClient, "topic-4", 1, 2, servers)
+    createTopic("topic-3", 5, 2)
+    createTopic("topic-4", 1, 2)
     validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-3", "topic-4").asJava, timeout).build())
   }
 
@@ -61,7 +61,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
       Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION))
 
     // Partial
-    TestUtils.createTopic(zkClient, "partial-topic-1", 1, 1, servers)
+    createTopic("partial-topic-1", 1, 1)
     validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(
       "partial-topic-1",
       "partial-invalid-topic").asJava, timeout).build(),
@@ -72,7 +72,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     )
 
     // Timeout
-    TestUtils.createTopic(zkClient, timeoutTopic, 5, 2, servers)
+    createTopic(timeoutTopic, 5, 2)
     // Must be a 0ms timeout to avoid transient test failures. Even a timeout of 1ms has succeeded in the past.
     validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(timeoutTopic).asJava, 0).build(),
       Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
index 9074ad8..8d1eb2c 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
@@ -40,7 +40,7 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
     val onlineDir = new File(servers.head.config.logDirs.head).getAbsolutePath
     val offlineDir = new File(servers.head.config.logDirs.tail.head).getAbsolutePath
     servers.head.replicaManager.handleLogDirFailure(offlineDir)
-    TestUtils.createTopic(zkClient, topic, partitionNum, 1, servers)
+    createTopic(topic, partitionNum, 1)
     TestUtils.produceMessages(servers, topic, 10)
 
     val request = new DescribeLogDirsRequest.Builder(null).build()
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 1d8912b..a426108 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -113,7 +113,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
     val topic = "topic"
     val topicPartition = new TopicPartition(topic, 0)
     val correlationId = -1
-    TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    createTopic(topic, numPartitions = 1, replicationFactor = 1)
 
     val version = ApiKeys.PRODUCE.latestVersion: Short
     val serializedBytes = {
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index f1811ad..9090fda 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -23,7 +23,6 @@ import java.util.Properties
 import kafka.api.KAFKA_0_11_0_IV2
 import kafka.log.LogConfig
 import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -342,8 +341,8 @@ class FetchRequestTest extends BaseRequestTest {
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
     configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
     topics.flatMap { topic =>
-      val partitionToLeader = createTopic(zkClient, topic, numPartitions = numPartitions, replicationFactor = 2,
-        servers = servers, topicConfig = topicConfig)
+      val partitionToLeader = createTopic(topic, numPartitions = numPartitions, replicationFactor = 2,
+        topicConfig = topicConfig)
       partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition) -> leader }
     }.toMap
   }
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6b3fff3..ba33ab0 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
 import kafka.server.LogDirFailureTest._
 import kafka.api.IntegrationTestHarness
 import kafka.controller.{OfflineReplica, PartitionAndReplica}
-import kafka.utils.{CoreUtils, Exit, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Exit, TestUtils}
 import kafka.zk.LogDirEventNotificationZNode
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
@@ -55,7 +55,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    TestUtils.createTopic(zkClient, topic, partitionNum, serverCount, servers = servers)
+    createTopic(topic, partitionNum, serverCount)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 51e1d95..d4c3e7c 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -85,8 +85,8 @@ class MetadataRequestTest extends BaseRequestTest {
     val internalTopic = Topic.GROUP_METADATA_TOPIC_NAME
     val notInternalTopic = "notInternal"
     // create the topics
-    TestUtils.createTopic(zkClient, internalTopic, 3, 2, servers)
-    TestUtils.createTopic(zkClient, notInternalTopic, 3, 2, servers)
+    createTopic(internalTopic, 3, 2)
+    createTopic(notInternalTopic, 3, 2)
 
     val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
     assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
@@ -104,8 +104,8 @@ class MetadataRequestTest extends BaseRequestTest {
   @Test
   def testNoTopicsRequest() {
     // create some topics
-    TestUtils.createTopic(zkClient, "t1", 3, 2, servers)
-    TestUtils.createTopic(zkClient, "t2", 3, 2, servers)
+    createTopic("t1", 3, 2)
+    createTopic("t2", 3, 2)
 
     // v0, Doesn't support a "no topics" request
     // v1, Empty list represents "no topics"
@@ -128,7 +128,7 @@ class MetadataRequestTest extends BaseRequestTest {
     val topic2 = "t2"
     val topic3 = "t3"
     val topic4 = "t4"
-    TestUtils.createTopic(zkClient, topic1, 1, 1, servers)
+    createTopic(topic1, 1, 1)
 
     val response1 = sendMetadataRequest(new MetadataRequest(Seq(topic1, topic2).asJava, true, ApiKeys.METADATA.latestVersion))
     checkAutoCreatedTopic(topic1, topic2, response1)
@@ -147,8 +147,8 @@ class MetadataRequestTest extends BaseRequestTest {
   @Test
   def testAllTopicsRequest() {
     // create some topics
-    TestUtils.createTopic(zkClient, "t1", 3, 2, servers)
-    TestUtils.createTopic(zkClient, "t2", 3, 2, servers)
+    createTopic("t1", 3, 2)
+    createTopic("t2", 3, 2)
 
     // v0, Empty list represents all topics
     val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 0.toShort))
@@ -167,7 +167,7 @@ class MetadataRequestTest extends BaseRequestTest {
   @Test
   def testPreferredReplica(): Unit = {
     val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
-    TestUtils.createTopic(zkClient, "t1", replicaAssignment, servers)
+    createTopic("t1", replicaAssignment)
     // Call controller and one different broker to ensure that metadata propagation works correctly
     val responses = Seq(
       sendMetadataRequest(new MetadataRequest.Builder(Seq("t1").asJava, true).build(), Some(controllerSocketServer)),
@@ -194,7 +194,7 @@ class MetadataRequestTest extends BaseRequestTest {
     val replicaCount = 3
 
     // create a topic with 3 replicas
-    TestUtils.createTopic(zkClient, replicaDownTopic, 1, replicaCount, servers)
+    createTopic(replicaDownTopic, 1, replicaCount)
 
     // Kill a replica node that is not the leader
     val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 5c80288..886c318 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -74,7 +74,7 @@ class RequestQuotaTest extends BaseRequestTest {
     RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
     super.setUp()
 
-    TestUtils.createTopic(zkClient, topic, numPartitions, 1, servers)
+    createTopic(topic, numPartitions, 1)
     leaderNode = servers.head
 
     // Change default client-id request quota to a small value and a single unthrottledClient with a large quota
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c8a81c7..2c2d9dd 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -37,8 +37,8 @@ import kafka.security.auth.{Acl, Authorizer, Resource}
 import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
-import ZkUtils._
 import Implicits._
+import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.{AdminZkClient, BrokerIdsZNode, BrokerInfo, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
@@ -60,7 +60,6 @@ import org.junit.Assert._
 import scala.collection.JavaConverters._
 import scala.collection.{Map, mutable}
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import scala.util.Try
 
 /**
  * Utility functions to help with testing
@@ -707,8 +706,8 @@ object TestUtils extends Logging {
       val listenerName = ListenerName.forSecurityProtocol(protocol)
       Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), b.rack)
     }
-    brokers.foreach(b => zkClient.registerBrokerInZk(new BrokerInfo(b.id, "localhost", 6667,
-      b.endPoints, jmxPort = -1, rack = b.rack, ApiVersion.latestVersion)))
+    brokers.foreach(b => zkClient.registerBrokerInZk(BrokerInfo(Broker(b.id, b.endPoints, rack = b.rack),
+      ApiVersion.latestVersion, jmxPort = -1)))
     brokers
   }
 
@@ -753,24 +752,18 @@ object TestUtils extends Logging {
     new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
   }
 
-  def makeLeaderForPartition(zkUtils: ZkUtils,
+  def makeLeaderForPartition(zkClient: KafkaZkClient,
                              topic: String,
                              leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
                              controllerEpoch: Int) {
-    leaderPerPartitionMap.foreach { case (partition, leader) =>
-      try {
-        val newLeaderAndIsr = zkUtils.getLeaderAndIsrForPartition(topic, partition)
-          .map(_.newLeader(leader))
-          .getOrElse(LeaderAndIsr(leader, List(leader)))
-
-        zkUtils.updatePersistentPath(
-          getTopicPartitionLeaderAndIsrPath(topic, partition),
-          zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
-        )
-      } catch {
-        case oe: Throwable => error(s"Error while electing leader for partition [$topic,$partition]", oe)
-      }
+    val newLeaderIsrAndControllerEpochs = leaderPerPartitionMap.map { case (partition, leader) =>
+      val topicPartition = new TopicPartition(topic, partition)
+      val newLeaderAndIsr = zkClient.getTopicPartitionState(topicPartition)
+        .map(_.leaderAndIsr.newLeader(leader))
+        .getOrElse(LeaderAndIsr(leader, List(leader)))
+      topicPartition -> LeaderIsrAndControllerEpoch(newLeaderAndIsr, controllerEpoch)
     }
+    zkClient.setTopicPartitionStatesRaw(newLeaderIsrAndControllerEpochs)
   }
 
   /**
@@ -795,7 +788,7 @@ object TestUtils extends Logging {
     var electedOrChangedLeader: Option[Int] = None
     while (electedOrChangedLeader.isEmpty && System.currentTimeMillis() < startTime + timeoutMs) {
       // check if leader is elected
-      leader = zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
+      leader = zkClient.getLeaderForPartition(topicPartition)
       leader match {
         case Some(l) => (newLeaderOpt, oldLeaderOpt) match {
           case (Some(newLeader), _) if newLeader == l =>
@@ -949,13 +942,9 @@ object TestUtils extends Logging {
     leader
   }
 
-  def waitUntilControllerElected(zkUtils: ZkUtils, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
-    var controllerIdTry: Try[Int] = null
-    TestUtils.waitUntilTrue(() => {
-      controllerIdTry = Try { zkUtils.getController() }
-      controllerIdTry.isSuccess
-    }, s"Controller not elected after $timeout ms", waitTime = timeout)
-    controllerIdTry.get
+  def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
+    val (controllerId, _) = TestUtils.computeUntilTrue(zkClient.getControllerId, waitTime = timeout)(_.isDefined)
+    controllerId.getOrElse(fail(s"Controller not elected after $timeout ms"))
   }
 
   def waitUntilLeaderIsKnown(servers: Seq[KafkaServer], topic: String, partition: Int,
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 1fdc3ea..fe5fbff 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -105,7 +105,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
     // create the topic
     adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
     // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
     val actualReplicaMap = leaderForPartitionMap.keys.map(p => p -> zkClient.getReplicasForPartition(new TopicPartition(topic, p))).toMap
     assertEquals(expectedReplicaAssignment.size, actualReplicaMap.size)
     for(i <- 0 until actualReplicaMap.size)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index f0d6cf0..f3b8e81 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -20,7 +20,7 @@ import java.util.{Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
 
 import kafka.api.ApiVersion
-import kafka.cluster.EndPoint
+import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
 import kafka.server.ConfigType
@@ -419,15 +419,12 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   def testBrokerRegistrationMethods() {
     zkClient.createTopLevelPaths()
 
-    val brokerInfo = new BrokerInfo(1, "test.host", 9999,
+    val brokerInfo = BrokerInfo(Broker(1,
       Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
-      9998, None, ApiVersion.latestVersion)
+      rack = None), ApiVersion.latestVersion, jmxPort = 9998)
 
     zkClient.registerBrokerInZk(brokerInfo)
-    val broker = zkClient.getBroker(1).getOrElse(fail("Unregistered broker"))
-
-    assertEquals(brokerInfo.id, broker.id)
-    assertEquals(brokerInfo.endpoints(), broker.endPoints.mkString(","))
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
   }
 
   @Test
@@ -448,9 +445,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   def testCreateTopLevelPaths() {
     zkClient.createTopLevelPaths()
 
-    ZkData.PersistentZkPaths.foreach {
-      path => assertTrue(zkClient.pathExists(path))
-    }
+    ZkData.PersistentZkPaths.foreach(path => assertTrue(zkClient.pathExists(path)))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a437810..c7c3152 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -32,7 +32,6 @@ import scala.collection.JavaConverters._
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
-import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.utils.Time
 
 @Category(Array(classOf[IntegrationTest]))
@@ -45,7 +44,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkUtils: ZkUtils = null
-  var zooKeeperClient: ZooKeeperClient = null
   var zkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null
 
@@ -58,10 +56,8 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
     zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
-
-    val time = Time.SYSTEM
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, time)
-    zkClient = new KafkaZkClient(zooKeeperClient, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), time)
+    zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
     adminZkClient = new AdminZkClient(zkClient)
   }
 
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index d402abb..c8ebaa9 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -16,20 +16,19 @@
  */
 package kafka.zookeeper
 
-import java.net.UnknownHostException
 import java.nio.charset.StandardCharsets
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
-import javax.security.auth.login.Configuration
 
 import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Meter
+import com.yammer.metrics.core.{Gauge, Meter, MetricName}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.ZooKeeper.States
 import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
 import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
 import org.junit.{After, Before, Test}
@@ -40,33 +39,41 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   private val mockPath = "/foo"
   private val time = Time.SYSTEM
 
+  private var zooKeeperClient: ZooKeeperClient = _
+
   @Before
   override def setUp() {
     cleanMetricsRegistry()
     super.setUp()
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
+      Time.SYSTEM, "testMetricGroup", "testMetricType")
   }
 
   @After
   override def tearDown() {
+    if (zooKeeperClient != null)
+      zooKeeperClient.close()
     super.tearDown()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
-    Configuration.setConfiguration(null)
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def testUnresolvableConnectString(): Unit = {
-    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, time)
+    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, time, "testMetricGroup",
+      "testMetricType").close()
   }
 
   @Test(expected = classOf[ZooKeeperClientTimeoutException])
   def testConnectionTimeout(): Unit = {
     zookeeper.shutdown()
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue, time)
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue, time, "testMetricGroup",
+      "testMetricType").close()
   }
 
   @Test
   def testConnection(): Unit = {
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time)
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
+      "testMetricType").close()
   }
 
   @Test
@@ -80,8 +87,8 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testDeleteExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
     val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
     assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode)
@@ -96,7 +103,8 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testExistsExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
     val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
     assertEquals("Response code for exists should be OK", Code.OK, existsResponse.resultCode)
@@ -331,53 +339,60 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       }
     }
 
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time)
-    zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
-    zooKeeperClient.reinitialize()
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
+      "testMetricGroup", "testMetricType")
+    try {
+      zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
+      zooKeeperClient.reinitialize()
 
-    assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+      assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+    } finally zooKeeperClient.close()
   }
 
   @Test
   def testConnectionLossRequestTermination(): Unit = {
     val batchSize = 10
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, time)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, time,
+      "testGroupType", "testGroupName")
     zookeeper.shutdown()
-    val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
-    val countDownLatch = new CountDownLatch(1)
-    val running = new AtomicBoolean(true)
-    val unexpectedResponses = new ArrayBlockingQueue[GetDataResponse](batchSize)
-    val requestThread = new Thread {
-      override def run(): Unit = {
-        while (running.get()) {
-          val responses = zooKeeperClient.handleRequests(requests)
-          val suffix = responses.dropWhile(response => response.resultCode != Code.CONNECTIONLOSS)
-          if (!suffix.forall(response => response.resultCode == Code.CONNECTIONLOSS))
-            responses.foreach(unexpectedResponses.add)
-          if (!unexpectedResponses.isEmpty || suffix.nonEmpty)
-            running.set(false)
+    try {
+      val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
+      val countDownLatch = new CountDownLatch(1)
+      val running = new AtomicBoolean(true)
+      val unexpectedResponses = new ArrayBlockingQueue[GetDataResponse](batchSize)
+      val requestThread = new Thread {
+        override def run(): Unit = {
+          while (running.get()) {
+            val responses = zooKeeperClient.handleRequests(requests)
+            val suffix = responses.dropWhile(response => response.resultCode != Code.CONNECTIONLOSS)
+            if (!suffix.forall(response => response.resultCode == Code.CONNECTIONLOSS))
+              responses.foreach(unexpectedResponses.add)
+            if (!unexpectedResponses.isEmpty || suffix.nonEmpty)
+              running.set(false)
+          }
+          countDownLatch.countDown()
         }
-        countDownLatch.countDown()
       }
-    }
-    requestThread.start()
-    val requestThreadTerminated = countDownLatch.await(30, TimeUnit.SECONDS)
-    if (!requestThreadTerminated) {
-      running.set(false)
-      requestThread.join(5000)
-      fail("Failed to receive a CONNECTIONLOSS response code after zookeeper has shutdown.")
-    } else if (!unexpectedResponses.isEmpty) {
-      fail(s"Received an unexpected non-CONNECTIONLOSS response code after a CONNECTIONLOSS response code from a single batch: $unexpectedResponses")
-    }
+      requestThread.start()
+      val requestThreadTerminated = countDownLatch.await(30, TimeUnit.SECONDS)
+      if (!requestThreadTerminated) {
+        running.set(false)
+        requestThread.join(5000)
+        fail("Failed to receive a CONNECTIONLOSS response code after zookeeper has shutdown.")
+      } else if (!unexpectedResponses.isEmpty) {
+        fail(s"Received an unexpected non-CONNECTIONLOSS response code after a CONNECTIONLOSS response code from a single batch: $unexpectedResponses")
+      }
+    } finally zooKeeperClient.close()
   }
 
-  @Test
-  def testSessionExpireListenerMetrics() {
-    val metrics = Metrics.defaultRegistry
+  def isExpectedMetricName(metricName: MetricName, name: String): Boolean =
+    metricName.getName == name && metricName.getGroup == "testMetricGroup" && metricName.getType == "testMetricType"
 
+  @Test
+  def testZooKeeperStateChangeRateMetrics() {
     def checkMeterCount(name: String, expected: Long) {
-      val meter = metrics.allMetrics.asScala.collectFirst {
-        case (metricName, meter: Meter) if metricName.getName == name => meter
+      val meter = Metrics.defaultRegistry.allMetrics.asScala.collectFirst {
+        case (metricName, meter: Meter) if isExpectedMetricName(metricName, name) => meter
       }.getOrElse(sys.error(s"Unable to find meter with name $name"))
       assertEquals(s"Unexpected meter count for $name", expected, meter.count)
     }
@@ -396,6 +411,23 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     checkMeterCount(disconnectsPerSecName, 1)
   }
 
+  @Test
+  def testZooKeeperSessionStateMetric(): Unit = {
+    def gaugeValue(name: String): Option[String] = {
+      Metrics.defaultRegistry.allMetrics.asScala.collectFirst {
+        case (metricName, gauge: Gauge[_]) if isExpectedMetricName(metricName, name) => gauge.value.asInstanceOf[String]
+      }
+    }
+
+    assertEquals(Some(States.CONNECTED.toString), gaugeValue("SessionState"))
+    assertEquals(States.CONNECTED, zooKeeperClient.connectionState)
+
+    zooKeeperClient.close()
+
+    assertEquals(None, gaugeValue("SessionState"))
+    assertEquals(States.CLOSED, zooKeeperClient.connectionState)
+  }
+
   private def cleanMetricsRegistry() {
     val metrics = Metrics.defaultRegistry
     metrics.allMetrics.keySet.asScala.foreach(metrics.removeMetric)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 0551fac..855bcea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -20,7 +20,6 @@ import kafka.log.LogConfig;
 import kafka.utils.MockTime;
 import kafka.zk.AdminZkClient;
 import kafka.zk.KafkaZkClient;
-import kafka.zookeeper.ZooKeeperClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serde;
@@ -93,13 +92,9 @@ public class InternalTopicIntegrationTest {
     }
 
     private Properties getTopicConfigProperties(final String changelog) {
-        final ZooKeeperClient zkClient = new ZooKeeperClient(
-                CLUSTER.zKConnectString(),
-                DEFAULT_ZK_SESSION_TIMEOUT_MS,
-                DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
-                Integer.MAX_VALUE, Time.SYSTEM,
+        final KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false,
+                DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM,
                 "testMetricGroup", "testMetricType");
-        final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, Time.SYSTEM);
         try {
             final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 275d580..6aafac0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -25,7 +25,6 @@ import kafka.utils.MockTime;
 import kafka.utils.TestUtils;
 import kafka.zk.AdminZkClient;
 import kafka.zk.KafkaZkClient;
-import kafka.zookeeper.ZooKeeperClient;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.Time;
 import org.junit.rules.TemporaryFolder;
@@ -171,36 +170,25 @@ public class KafkaEmbedded {
                             final Properties topicConfig) {
         log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
             topic, partitions, replication, topicConfig);
+        try (KafkaZkClient kafkaZkClient = createZkClient()) {
+            final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+            adminZkClient.createTopic(topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
+        }
+    }
 
-        final ZooKeeperClient zkClient = new ZooKeeperClient(
-                zookeeperConnect(),
-                DEFAULT_ZK_SESSION_TIMEOUT_MS,
-                DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
-                Integer.MAX_VALUE,
-                Time.SYSTEM,
-                "testMetricGroup",
-                "testMetricType");
-        final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, Time.SYSTEM);
-        final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
-        adminZkClient.createTopic(topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
-        kafkaZkClient.close();
+    private KafkaZkClient createZkClient() {
+        return KafkaZkClient.apply(zookeeperConnect(), false, DEFAULT_ZK_SESSION_TIMEOUT_MS,
+                DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, "testMetricGroup", "testMetricType");
     }
 
     public void deleteTopic(final String topic) {
         log.debug("Deleting topic { name: {} }", topic);
 
-        final ZooKeeperClient zkClient = new ZooKeeperClient(
-                zookeeperConnect(),
-                DEFAULT_ZK_SESSION_TIMEOUT_MS,
-                DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
-                Integer.MAX_VALUE,
-                Time.SYSTEM,
-                "testMetricGroup",
-                "testMetricType");
-        final KafkaZkClient kafkaZkClient = new KafkaZkClient(zkClient, false, Time.SYSTEM);
-        final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
-        adminZkClient.deleteTopic(topic);
-        kafkaZkClient.close();
+        try (KafkaZkClient kafkaZkClient = createZkClient()) {
+            final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
+            adminZkClient.deleteTopic(topic);
+            kafkaZkClient.close();
+        }
     }
 
     public KafkaServer kafkaServer() {

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].