You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/15 16:41:48 UTC

[kafka] branch trunk updated: MINOR: Update test classes to use KafkaZkClient methods (#4367)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d9d0d79  MINOR: Update test classes to use KafkaZkClient methods (#4367)
d9d0d79 is described below

commit d9d0d79287eeec0a1c3dcc2203288421284b5ca1
Author: Manikumar Reddy O <ma...@gmail.com>
AuthorDate: Thu Feb 15 22:11:38 2018 +0530

    MINOR: Update test classes to use KafkaZkClient methods (#4367)
    
    Remove ZkUtils reference form ZooKeeperTestHarness plus some minor cleanups.
---
 core/src/main/scala/kafka/utils/ZkUtils.scala      |  2 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 22 +++++-
 .../kafka/api/AuthorizerIntegrationTest.scala      | 12 +--
 .../integration/kafka/api/ConsumerBounceTest.scala |  2 +-
 .../kafka/api/ProducerFailureHandlingTest.scala    | 16 ++--
 .../kafka/api/SaslPlainPlaintextConsumerTest.scala |  5 +-
 .../SaslPlainSslEndToEndAuthorizationTest.scala    |  5 +-
 .../SaslScramSslEndToEndAuthorizationTest.scala    |  4 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    | 34 +++++---
 .../unit/kafka/admin/DeleteConsumerGroupTest.scala | 18 ++++-
 .../scala/unit/kafka/admin/TopicCommandTest.scala  |  8 +-
 .../consumer/ZookeeperConsumerConnectorTest.scala  |  7 +-
 .../controller/ControllerIntegrationTest.scala     | 92 +++++++++++-----------
 .../scala/unit/kafka/integration/FetcherTest.scala | 12 ++-
 .../kafka/security/auth/ZkAuthorizationTest.scala  | 11 ++-
 .../unit/kafka/server/LeaderElectionTest.scala     |  8 +-
 .../unit/kafka/server/LogDirFailureTest.scala      |  4 +-
 .../kafka/server/ServerGenerateBrokerIdTest.scala  |  8 --
 .../kafka/server/ServerGenerateClusterIdTest.scala |  6 +-
 .../unit/kafka/server/ServerStartupTest.scala      |  6 +-
 .../unit/kafka/utils/ReplicationUtilsTest.scala    | 15 ++--
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 15 ++--
 .../test/scala/unit/kafka/utils/ZkUtilsTest.scala  | 24 +++++-
 .../test/scala/unit/kafka/zk/ZKEphemeralTest.scala | 13 +--
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  6 +-
 25 files changed, 216 insertions(+), 139 deletions(-)

diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index e04bce0..d5fde4d 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -592,7 +592,7 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   /**
-   * Update the value of a persistent node with the given path and data.
+   * Update the value of a ephemeral node with the given path and data.
    * create parent directory if necessary. Never throw NodeExistException.
    */
   def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = {
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index b545455..afc8202 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -761,13 +761,31 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   /**
    * Gets the leader for a given partition
-   * @param partition
+   * @param partition The partition for which we want to get leader.
    * @return optional integer if the leader exists and None otherwise.
    */
   def getLeaderForPartition(partition: TopicPartition): Option[Int] =
     getTopicPartitionState(partition).map(_.leaderAndIsr.leader)
 
   /**
+   * Gets the in-sync replicas (ISR) for a specific topicPartition
+   * @param partition The partition for which we want to get ISR.
+   * @return optional ISR if exists and None otherwise
+   */
+  def getInSyncReplicasForPartition(partition: TopicPartition): Option[Seq[Int]] =
+    getTopicPartitionState(partition).map(_.leaderAndIsr.isr)
+
+
+  /**
+   * Gets the leader epoch for a specific topicPartition
+   * @param partition The partition for which we want to get the leader epoch
+   * @return optional integer if the leader exists and None otherwise
+   */
+  def getEpochForPartition(partition: TopicPartition): Option[Int] = {
+    getTopicPartitionState(partition).map(_.leaderAndIsr.leaderEpoch)
+  }
+
+  /**
    * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
    * @return sequence of znode names and not the absolute znode path.
    */
@@ -1356,7 +1374,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
     }
   }
 
-  private[zk] def pathExists(path: String): Boolean = {
+  def pathExists(path: String): Boolean = {
     val existsRequest = ExistsRequest(path)
     val existsResponse = retryRequestUntilConnected(existsRequest)
     existsResponse.resultCode match {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 248d219..ab7ca64 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -245,14 +245,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkClient, GROUP_METADATA_TOPIC_NAME,
-      1,
-      1,
-      servers,
-      servers.head.groupCoordinator.offsetsTopicConfigs)
+    createTopic(GROUP_METADATA_TOPIC_NAME, topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
     // create the test topic with all the brokers as replicas
-    TestUtils.createTopic(zkClient, topic, 1, 1, this.servers)
-    TestUtils.createTopic(zkClient, deleteTopic, 1, 1, this.servers)
+    createTopic(topic)
+    createTopic(deleteTopic)
   }
 
   @After
@@ -711,7 +707,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     // create an unmatched topic
     val unmatchedTopic = "unmatched"
-    TestUtils.createTopic(zkClient, unmatchedTopic, 1, 1, this.servers)
+    createTopic(unmatchedTopic)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),  new Resource(Topic, unmatchedTopic))
     sendRecords(1, new TopicPartition(unmatchedTopic, part))
     removeAllAcls()
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 58d1be9..53b3ed6 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -173,7 +173,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     val consumer = this.consumers.head
     consumer.subscribe(Collections.singleton(newtopic))
     executor.schedule(new Runnable {
-        def run() = TestUtils.createTopic(zkClient, newtopic, serverCount, serverCount, servers)
+        def run() = createTopic(newtopic, numPartitions = serverCount, replicationFactor = serverCount)
       }, 2, TimeUnit.SECONDS)
     consumer.poll(0)
 
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index b9fb657..8ca5163 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -88,7 +88,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testTooLargeRecordWithAckZero() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    createTopic(topic1, replicationFactor = numServers)
 
     // send a too-large record
     val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@@ -105,7 +105,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testTooLargeRecordWithAckOne() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    createTopic(topic1, replicationFactor = numServers)
 
     // send a too-large record
     val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
@@ -122,7 +122,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
 
     // create topic
     val topic10 = "topic10"
-    TestUtils.createTopic(zkClient, topic10, servers.size, numServers, servers, topicConfig)
+    createTopic(topic10, numPartitions = servers.size, replicationFactor = numServers, topicConfig)
 
     // send a record that is too large for replication, but within the broker max message limit
     val value = new Array[Byte](maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD)
@@ -169,7 +169,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testWrongBrokerList() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    createTopic(topic1, replicationFactor = numServers)
 
     // producer with incorrect broker list
     producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
@@ -188,7 +188,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testInvalidPartition() {
     // create topic with a single partition
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    createTopic(topic1, numPartitions = 1, replicationFactor = numServers)
 
     // create a record with incorrect partition id (higher than the number of partitions), send should fail
     val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, "value".getBytes)
@@ -203,7 +203,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testSendAfterClosed() {
     // create topic
-    TestUtils.createTopic(zkClient, topic1, 1, numServers, servers)
+    createTopic(topic1, replicationFactor = numServers)
 
     val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
 
@@ -241,7 +241,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     val topicProps = new Properties()
     topicProps.put("min.insync.replicas",(numServers+1).toString)
 
-    TestUtils.createTopic(zkClient, topicName, 1, numServers, servers, topicProps)
+    createTopic(topicName, replicationFactor = numServers, topicConfig = topicProps)
 
     val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
     try {
@@ -261,7 +261,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     val topicProps = new Properties()
     topicProps.put("min.insync.replicas", numServers.toString)
 
-    TestUtils.createTopic(zkClient, topicName, 1, numServers, servers,topicProps)
+    createTopic(topicName, replicationFactor = numServers, topicConfig = topicProps)
 
     val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
     // this should work with all brokers up and running
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 99ddcc3..5789d1a 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -16,8 +16,9 @@ import java.io.File
 import java.util.Locale
 
 import kafka.server.KafkaConfig
-import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils}
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before, Test}
 
@@ -51,6 +52,8 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
    */
   @Test
   def testZkAclsDisabled() {
+    val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
     TestUtils.verifyUnsecureZkAcls(zkUtils)
+    CoreUtils.swallow(zkUtils.close(), this)
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index c28de3e..08351aa 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -16,8 +16,9 @@
   */
 package kafka.api
 
-import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils, ZkUtils}
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
 import org.junit.Test
 
@@ -56,6 +57,8 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
    */
   @Test
   def testAcls() {
+    val zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
     TestUtils.verifySecureZkAcls(zkUtils, 1)
+    CoreUtils.swallow(zkUtils.close(), this)
   }
 }
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index a4f3e0b..f07f4b4 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -19,6 +19,8 @@ package kafka.api
 import org.apache.kafka.common.security.scram.ScramMechanism
 import kafka.utils.JaasTestUtils
 import kafka.utils.ZkUtils
+import kafka.zk.ConfigEntityChangeNotificationZNode
+
 import scala.collection.JavaConverters._
 import org.junit.Before
 
@@ -32,7 +34,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
 
   override def configureSecurityBeforeServersStart() {
     super.configureSecurityBeforeServersStart()
-    zkClient.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath)
+    zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
     // Create broker credentials before starting brokers
     createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword)
   }
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 3b5f888..e0e26a8 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -22,12 +22,12 @@ import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, Invali
 import org.apache.kafka.common.metrics.Quota
 import org.easymock.EasyMock
 import org.junit.Assert._
-import org.junit.{After, Test}
+import org.junit.{After, Before, Test}
 import java.util.Properties
 
 import kafka.utils._
 import kafka.log._
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{ConfigEntityZNode, PreferredReplicaElectionZNode, ZooKeeperTestHarness}
 import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
 import java.io.File
@@ -39,6 +39,7 @@ import kafka.utils.TestUtils._
 import scala.collection.{Map, Set, immutable}
 import kafka.utils.CoreUtils._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.security.JaasUtils
 
 import scala.collection.JavaConverters._
 import scala.util.Try
@@ -46,9 +47,18 @@ import scala.util.Try
 class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
   var servers: Seq[KafkaServer] = Seq()
+  var zkUtils: ZkUtils = null
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
+  }
 
   @After
   override def tearDown() {
+    if (zkUtils != null)
+     CoreUtils.swallow(zkUtils.close(), this)
     TestUtils.shutdownServers(servers)
     super.tearDown()
   }
@@ -212,9 +222,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
     // in sync replicas should not have any replica that is not in the new assigned replicas
-    checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
   }
@@ -242,8 +252,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
   }
@@ -271,8 +281,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
   }
@@ -313,9 +323,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
                             "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
     assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(zkUtils, topic, partitionToBeReassigned, assignedReplicas)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
     // ensure that there are no under replicated partitions
-    ensureNoUnderReplicatedPartitions(zkUtils, topic, partitionToBeReassigned, assignedReplicas, servers)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
   }
@@ -326,7 +336,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val partitionsForPreferredReplicaElection = Set(new TopicPartition("test", 1), new TopicPartition("test2", 1))
     PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
     // try to read it back and compare with what was written
-    val preferredReplicaElectionZkData = zkUtils.readData(ZkUtils.PreferredReplicaLeaderElectionPath)._1
+    val preferredReplicaElectionZkData = zkUtils.readData(PreferredReplicaElectionZNode.path)._1
     val partitionsUndergoingPreferredReplicaElection =
       PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
     assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
@@ -531,7 +541,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // Write config without notification to ZK.
     val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000")
     val map = Map("version" -> 1, "config" -> configMap.asJava)
-    zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encodeAsString(map.asJava))
+    zkUtils.updatePersistentPath(ConfigEntityZNode.path(ConfigType.Client, clientId), Json.encodeAsString(map.asJava))
 
     val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client)
     assertEquals("Must have 1 overriden client config", 1, configInZk.size)
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index 8a731cf..da17f16 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -20,15 +20,31 @@ import java.nio.charset.StandardCharsets
 
 import kafka.utils._
 import kafka.server.KafkaConfig
-import org.junit.Test
+import org.junit.{After, Before, Test}
 import kafka.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import kafka.integration.KafkaServerTestHarness
+import org.apache.kafka.common.security.JaasUtils
 
 
 @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
 class DeleteConsumerGroupTest extends KafkaServerTestHarness {
   def generateConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
+  var zkUtils: ZkUtils = null
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
+  }
+
+  @After
+  override def tearDown() {
+    if (zkUtils != null)
+     CoreUtils.swallow(zkUtils.close(), this)
+    super.tearDown()
+  }
+
 
   @Test
   def testGroupWideDeleteInZK() {
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 291082c..6a276df 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -80,9 +80,9 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // delete the NormalTopic
     val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
     val deletePath = getDeleteTopicPath(normalTopic)
-    assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deletePath))
+    assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deletePath))
     TopicCommand.deleteTopic(zkClient, deleteOpts)
-    assertTrue("Delete path for topic should exist after deletion.", zkUtils.pathExists(deletePath))
+    assertTrue("Delete path for topic should exist after deletion.", zkClient.pathExists(deletePath))
 
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
@@ -93,11 +93,11 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     // try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
     val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
     val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
-    assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
+    assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.pathExists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
       TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
     }
-    assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
+    assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.pathExists(deleteOffsetTopicPath))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 77930e6..b4381a4 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -28,8 +28,9 @@ import kafka.serializer._
 import kafka.server._
 import kafka.utils.TestUtils._
 import kafka.utils._
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.log4j.{Level, Logger}
-import org.junit.{Test, After, Before}
+import org.junit.{After, Before, Test}
 
 import scala.collection._
 
@@ -43,6 +44,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
   val topic = "topic1"
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+  var zkUtils: ZkUtils = null
 
   override def generateConfigs =
     TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps))
@@ -57,11 +59,14 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging
   @Before
   override def setUp() {
     super.setUp()
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
     dirs = new ZKGroupTopicDirs(group, topic)
   }
 
   @After
   override def tearDown() {
+    if (zkUtils != null)
+     CoreUtils.swallow(zkUtils.close(), this)
     super.tearDown()
   }
 
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 0ad64c5..88fe82b 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -20,12 +20,12 @@ package kafka.controller
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Timer
 import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils
+import kafka.zk.{PreferredReplicaElectionZNode, ZooKeeperTestHarness}
 import org.junit.{After, Before, Test}
 import org.junit.Assert.assertTrue
+import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConverters._
 
@@ -47,37 +47,37 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testEmptyCluster(): Unit = {
     servers = makeServers(1)
-    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+    TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
     waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
   }
 
   @Test
   def testControllerEpochPersistsWhenAllBrokersDown(): Unit = {
     servers = makeServers(1)
-    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+    TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
     waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
     servers.head.shutdown()
     servers.head.awaitShutdown()
-    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ControllerPath), "failed to kill controller")
+    TestUtils.waitUntilTrue(() => !zkClient.getControllerId.isDefined, "failed to kill controller")
     waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "controller epoch was not persisted after broker failure")
   }
 
   @Test
   def testControllerMoveIncrementsControllerEpoch(): Unit = {
     servers = makeServers(1)
-    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+    TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
     waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
     servers.head.shutdown()
     servers.head.awaitShutdown()
     servers.head.startup()
-    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+    TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
     waitUntilControllerEpoch(KafkaController.InitialControllerEpoch + 1, "controller epoch was not incremented after controller move")
   }
 
   @Test
   def testTopicCreation(): Unit = {
     servers = makeServers(1)
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(0))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
@@ -91,7 +91,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers.take(1))
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
@@ -101,12 +101,12 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   @Test
   def testTopicPartitionExpansion(): Unit = {
     servers = makeServers(1)
-    val tp0 = TopicAndPartition("t", 0)
-    val tp1 = TopicAndPartition("t", 1)
+    val tp0 = new TopicPartition("t", 0)
+    val tp1 = new TopicPartition("t", 1)
     val assignment = Map(tp0.partition -> Seq(0))
-    val expandedAssignment = Map(tp0.partition -> Seq(0), tp1.partition -> Seq(0))
+    val expandedAssignment = Map(tp0 -> Seq(0), tp1 -> Seq(0))
     TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
-    zkUtils.updatePersistentPath(ZkUtils.getTopicPath(tp0.topic), zkUtils.replicaAssignmentZkData(expandedAssignment.map(kv => kv._1.toString -> kv._2)))
+    zkClient.setTopicAssignment(tp0.topic, expandedAssignment)
     waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic partition expansion")
     TestUtils.waitUntilMetadataIsPropagated(servers, tp1.topic, tp1.partition)
@@ -117,14 +117,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
-    val tp0 = TopicAndPartition("t", 0)
-    val tp1 = TopicAndPartition("t", 1)
+    val tp0 = new TopicPartition("t", 0)
+    val tp1 = new TopicPartition("t", 1)
     val assignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId))
-    val expandedAssignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId), tp1.partition -> Seq(otherBrokerId, controllerId))
+    val expandedAssignment = Map(tp0 -> Seq(otherBrokerId, controllerId), tp1 -> Seq(otherBrokerId, controllerId))
     TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
-    zkUtils.updatePersistentPath(ZkUtils.getTopicPath(tp0.topic), zkUtils.replicaAssignmentZkData(expandedAssignment.map(kv => kv._1.toString -> kv._2)))
+    zkClient.setTopicAssignment(tp0.topic, expandedAssignment)
     waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic partition expansion")
     TestUtils.waitUntilMetadataIsPropagated(Seq(servers(controllerId)), tp1.topic, tp1.partition)
@@ -139,16 +139,16 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val timerCount = timer(metricName).count
 
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(controllerId))
     val reassignment = Map(tp -> Seq(otherBrokerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
-    zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+    zkClient.createPartitionReassignment(reassignment)
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3,
       "failed to get expected partition state after partition reassignment")
-    TestUtils.waitUntilTrue(() => zkUtils.getReplicaAssignmentForTopics(Seq(tp.topic)) == reassignment,
+    TestUtils.waitUntilTrue(() =>  zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
       "failed to get updated partition assignment on topic znode after partition reassignment")
-    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+    TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(),
       "failed to remove reassign partitions path after completion")
 
     val updatedTimerCount = timer(metricName).count
@@ -160,16 +160,16 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(controllerId))
     val reassignment = Map(tp -> Seq(otherBrokerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
-    zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+    zkClient.setOrCreatePartitionReassignment(reassignment)
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
       "failed to get expected partition state during partition reassignment with offline replica")
-    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+    TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress(),
       "partition reassignment path should remain while reassignment in progress")
   }
 
@@ -178,21 +178,21 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(controllerId))
     val reassignment = Map(tp -> Seq(otherBrokerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
-    zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+    zkClient.createPartitionReassignment(reassignment)
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
       "failed to get expected partition state during partition reassignment with offline replica")
     servers(otherBrokerId).startup()
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4,
       "failed to get expected partition state after partition reassignment")
-    TestUtils.waitUntilTrue(() => zkUtils.getReplicaAssignmentForTopics(Seq(tp.topic)) == reassignment,
+    TestUtils.waitUntilTrue(() => zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
       "failed to get updated partition assignment on topic znode after partition reassignment")
-    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+    TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress(),
       "failed to remove reassign partitions path after completion")
   }
 
@@ -201,7 +201,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBroker = servers.find(_.config.brokerId != controllerId).get
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch)
@@ -212,7 +212,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBroker = servers.find(_.config.brokerId != controllerId).get
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch)
@@ -224,13 +224,13 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
-    zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
-    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
+    zkClient.createPreferredReplicaElection(Set(tp))
+    TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path),
       "failed to remove preferred replica leader election path after giving up")
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
       "failed to get expected partition state upon broker shutdown")
@@ -241,7 +241,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers = makeServers(2, autoLeaderRebalanceEnable = true)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(1, 0))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     servers(otherBrokerId).shutdown()
@@ -258,7 +258,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
@@ -266,7 +266,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
     TestUtils.waitUntilTrue(() => {
-      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
+      val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
         leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
@@ -278,7 +278,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers = makeServers(2, uncleanLeaderElectionEnable = true)
     val controllerId = TestUtils.waitUntilControllerElected(zkClient)
     val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
-    val tp = TopicAndPartition("t", 0)
+    val tp = new TopicPartition("t", 0)
     val assignment = Map(tp.partition -> Seq(otherBrokerId))
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
@@ -286,39 +286,39 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     servers(1).shutdown()
     servers(1).awaitShutdown()
     TestUtils.waitUntilTrue(() => {
-      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
+      val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
         leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
     }, "failed to get expected partition state after entire isr went offline")
   }
 
-  private def preferredReplicaLeaderElection(controllerId: Int, otherBroker: KafkaServer, tp: TopicAndPartition,
+  private def preferredReplicaLeaderElection(controllerId: Int, otherBroker: KafkaServer, tp: TopicPartition,
                                              replicas: Set[Int], leaderEpoch: Int): Unit = {
     otherBroker.shutdown()
     otherBroker.awaitShutdown()
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, leaderEpoch + 1,
       "failed to get expected partition state upon broker shutdown")
     otherBroker.startup()
-    TestUtils.waitUntilTrue(() => zkUtils.getInSyncReplicasForPartition(tp.topic, tp.partition).toSet == replicas, "restarted broker failed to join in-sync replicas")
-    zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
-    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
+    TestUtils.waitUntilTrue(() => zkClient.getInSyncReplicasForPartition(new TopicPartition(tp.topic, tp.partition)).get.toSet == replicas, "restarted broker failed to join in-sync replicas")
+    zkClient.createPreferredReplicaElection(Set(tp))
+    TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path),
       "failed to remove preferred replica leader election path after completion")
     waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBroker.config.brokerId, leaderEpoch + 2,
       "failed to get expected partition state upon broker startup")
   }
 
   private def waitUntilControllerEpoch(epoch: Int, message: String): Unit = {
-    TestUtils.waitUntilTrue(() => zkUtils.readDataMaybeNull(ZkUtils.ControllerEpochPath)._1.map(_.toInt) == Some(epoch), message)
+    TestUtils.waitUntilTrue(() => zkClient.getControllerEpoch.get._1 == epoch, message)
   }
 
-  private def waitForPartitionState(tp: TopicAndPartition,
+  private def waitForPartitionState(tp: TopicPartition,
                                     controllerEpoch: Int,
                                     leader: Int,
                                     leaderEpoch: Int,
                                     message: String): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
+      val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), controllerEpoch, leader, leaderEpoch)
     }, message)
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 617b326..0a8c49f 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -19,15 +19,16 @@ package kafka.integration
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import org.junit.{Test, After, Before}
+
+import org.junit.{After, Before, Test}
 
 import scala.collection._
 import org.junit.Assert._
-
 import kafka.cluster._
 import kafka.server._
 import kafka.consumer._
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import org.apache.kafka.common.security.JaasUtils
 
 @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
 class FetcherTest extends KafkaServerTestHarness {
@@ -39,10 +40,13 @@ class FetcherTest extends KafkaServerTestHarness {
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
 
   var fetcher: ConsumerFetcherManager = null
+  var zkUtils: ZkUtils = null
 
   @Before
   override def setUp() {
     super.setUp
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
+
     createTopic(topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)))
 
     val cluster = new Cluster(servers.map { s =>
@@ -65,6 +69,8 @@ class FetcherTest extends KafkaServerTestHarness {
   @After
   override def tearDown() {
     fetcher.stopConnections()
+    if (zkUtils != null)
+     CoreUtils.swallow(zkUtils.close(), this)
     super.tearDown
   }
 
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 033ca67..19fa19d 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -18,20 +18,22 @@
 package kafka.security.auth
 
 import kafka.admin.ZkSecurityMigrator
-import kafka.utils.{Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.{ACL}
+import org.apache.zookeeper.data.ACL
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+
 import scala.collection.JavaConverters._
-import scala.util.{Try, Success, Failure}
+import scala.util.{Failure, Success, Try}
 import javax.security.auth.login.Configuration
 
 class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
   val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
   val authProvider = "zookeeper.authProvider.1"
+  var zkUtils: ZkUtils = null
 
   @Before
   override def setUp() {
@@ -39,10 +41,13 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
     Configuration.setConfiguration(null)
     System.setProperty(authProvider, "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
     super.setUp()
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
   }
 
   @After
   override def tearDown() {
+    if (zkUtils != null)
+     CoreUtils.swallow(zkUtils.close(), this)
     super.tearDown()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     System.clearProperty(authProvider)
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 6768b84..751053d 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -74,7 +74,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker
     val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
 
-    val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
+    val leaderEpoch1 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
     debug("leader Epoch: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1))
     // NOTE: this is to avoid transient test failures
@@ -86,7 +86,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     // check if leader moves to the other server
     val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
                                                     oldLeaderOpt = if (leader1 == 0) None else Some(leader1))
-    val leaderEpoch2 = zkUtils.getEpochForPartition(topic, partitionId)
+    val leaderEpoch2 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
     debug("Leader is elected to be: %s".format(leader1))
     debug("leader Epoch: " + leaderEpoch2)
     assertEquals("Leader must move to broker 0", 0, leader2)
@@ -100,7 +100,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     Thread.sleep(zookeeper.tickTime)
     val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
                                                     oldLeaderOpt = if (leader2 == 1) None else Some(leader2))
-    val leaderEpoch3 = zkUtils.getEpochForPartition(topic, partitionId)
+    val leaderEpoch3 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
     debug("leader Epoch: " + leaderEpoch3)
     debug("Leader is elected to be: %s".format(leader3))
     assertEquals("Leader must return to 1", 1, leader3)
@@ -119,7 +119,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker
     val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
 
-    val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
+    val leaderEpoch1 = zkClient.getEpochForPartition(new TopicPartition(topic, partitionId)).get
     debug("leader Epoch: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1))
     // NOTE: this is to avoid transient test failures
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index ba33ab0..2087363 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -24,7 +24,7 @@ import kafka.server.LogDirFailureTest._
 import kafka.api.IntegrationTestHarness
 import kafka.controller.{OfflineReplica, PartitionAndReplica}
 import kafka.utils.{CoreUtils, Exit, TestUtils}
-import kafka.zk.LogDirEventNotificationZNode
+
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
@@ -191,7 +191,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     }, "Expected some messages", 3000L)
 
     // There should be no remaining LogDirEventNotification znode
-    assertTrue(zkUtils.getChildrenParentMayNotExist(LogDirEventNotificationZNode.path).isEmpty)
+    assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
 
     // The controller should have marked the replica on the original leader as offline
     val controllerServer = servers.find(_.kafkaController.isActive).get
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 29a1fa6..2fa6600 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -189,12 +189,4 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     }
     true
   }
-
-  @Test
-  def testGetSequenceIdMethod() {
-    val path = "/test/seqid"
-    (1 to 10).foreach { seqid =>
-      assertEquals(seqid, zkUtils.getSequenceId(path))
-    }
-  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index 54ba887..0317da3 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -49,7 +49,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
   @Test
   def testAutoGenerateClusterId() {
     // Make sure that the cluster id doesn't exist yet.
-    assertFalse(zkUtils.pathExists(ZkUtils.ClusterIdPath))
+    assertFalse(zkClient.getClusterId.isDefined)
 
     var server1 = TestUtils.createServer(config1)
     servers = Seq(server1)
@@ -61,7 +61,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
     server1.shutdown()
 
     // Make sure that the cluster id is persistent.
-    assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath))
+    assertTrue(zkClient.getClusterId.isDefined)
     assertEquals(zkClient.getClusterId, Some(clusterIdOnFirstBoot))
 
     // Restart the server check to confirm that it uses the clusterId generated previously
@@ -74,7 +74,7 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
     server1.shutdown()
 
     // Make sure that the cluster id is persistent after multiple reboots.
-    assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath))
+    assertTrue(zkClient.getClusterId.isDefined)
     assertEquals(zkClient.getClusterId, Some(clusterIdOnFirstBoot))
 
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 3414c36..4c05d98 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -45,7 +45,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
     server = TestUtils.createServer(KafkaConfig.fromProps(props))
 
-    val pathExists = zkUtils.pathExists(zookeeperChroot)
+    val pathExists = zkClient.pathExists(zookeeperChroot)
     assertTrue(pathExists)
   }
 
@@ -76,7 +76,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     val brokerId = 0
     val props1 = TestUtils.createBrokerConfig(brokerId, zkConnect)
     server = TestUtils.createServer(KafkaConfig.fromProps(props1))
-    val brokerRegistration = zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1
+    val brokerRegistration = zkClient.getBroker(brokerId).getOrElse(fail("broker doesn't exists"))
 
     val props2 = TestUtils.createBrokerConfig(brokerId, zkConnect)
     try {
@@ -88,7 +88,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     }
 
     // broker registration shouldn't change
-    assertEquals(brokerRegistration, zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
+    assertEquals(brokerRegistration, zkClient.getBroker(brokerId).getOrElse(fail("broker doesn't exists")))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 3a42c3b..3389161 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -19,12 +19,12 @@ package kafka.utils
 
 import kafka.server.{KafkaConfig, ReplicaFetcherManager}
 import kafka.api.LeaderAndIsr
-import kafka.zk.ZooKeeperTestHarness
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.{IsrChangeNotificationZNode, TopicZNode, ZooKeeperTestHarness}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.easymock.EasyMock
-import scala.collection.JavaConverters._
 
 class ReplicationUtilsTest extends ZooKeeperTestHarness {
   private val zkVersion = 1
@@ -34,14 +34,15 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
   private val leaderEpoch = 1
   private val controllerEpoch = 1
   private val isr = List(1, 2)
-  private val topicPath = s"/brokers/topics/$topic/partitions/$partition/state"
-  private val topicData = Json.encodeAsString(Map("controller_epoch" -> controllerEpoch, "leader" -> leader,
-    "versions" -> 1, "leader_epoch" -> leaderEpoch, "isr" -> isr).asJava)
 
   @Before
   override def setUp() {
     super.setUp()
-    zkUtils.createPersistentPath(topicPath, topicData)
+    zkClient.makeSurePersistentPathExists(TopicZNode.path(topic))
+    val topicPartition = new TopicPartition(topic, partition)
+    val leaderAndIsr = LeaderAndIsr(leader, leaderEpoch, isr, 1)
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    zkClient.createTopicPartitionStatesRaw(Map(topicPartition -> leaderIsrAndControllerEpoch))
   }
 
   @Test
@@ -63,7 +64,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
     EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
     EasyMock.replay(replicaManager)
 
-    zkClient.makeSurePersistentPathExists(ZkUtils.IsrChangeNotificationPath)
+    zkClient.makeSurePersistentPathExists(IsrChangeNotificationZNode.path)
 
     val replicas = List(0, 1)
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 303afa7..636c0bd 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -977,24 +977,25 @@ object TestUtils extends Logging {
     file.close()
   }
 
-  def checkForPhantomInSyncReplicas(zkUtils: ZkUtils, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {
-    val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionToBeReassigned)
+  def checkForPhantomInSyncReplicas(zkClient: KafkaZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {
+    val inSyncReplicas = zkClient.getInSyncReplicasForPartition(new TopicPartition(topic, partitionToBeReassigned))
     // in sync replicas should not have any replica that is not in the new assigned replicas
-    val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet
+    val phantomInSyncReplicas = inSyncReplicas.get.toSet -- assignedReplicas.toSet
     assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas),
       phantomInSyncReplicas.isEmpty)
   }
 
-  def ensureNoUnderReplicatedPartitions(zkUtils: ZkUtils, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
+  def ensureNoUnderReplicatedPartitions(zkClient: KafkaZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
                                                 servers: Seq[KafkaServer]) {
+    val topicPartition = new TopicPartition(topic, partitionToBeReassigned)
     TestUtils.waitUntilTrue(() => {
-        val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionToBeReassigned)
-        inSyncReplicas.size == assignedReplicas.size
+        val inSyncReplicas = zkClient.getInSyncReplicasForPartition(topicPartition)
+        inSyncReplicas.get.size == assignedReplicas.size
       },
       "Reassigned partition [%s,%d] is under replicated".format(topic, partitionToBeReassigned))
     var leader: Option[Int] = None
     TestUtils.waitUntilTrue(() => {
-        leader = zkUtils.getLeaderForPartition(topic, partitionToBeReassigned)
+        leader = zkClient.getLeaderForPartition(topicPartition)
         leader.isDefined
       },
       "Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned))
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
index 9f78124..292db8b 100755
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
@@ -21,12 +21,27 @@ import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.JaasUtils
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Before, Test}
 
 class ZkUtilsTest extends ZooKeeperTestHarness {
 
   val path = "/path"
+  var zkUtils: ZkUtils = _
+
+  @Before
+  override def setUp() {
+    super.setUp
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
+  }
+
+  @After
+  override def tearDown() {
+    if (zkUtils != null)
+     CoreUtils.swallow(zkUtils.close(), this)
+    super.tearDown
+  }
 
   @Test
   def testSuccessfulConditionalDeletePath() {
@@ -107,4 +122,11 @@ class ZkUtilsTest extends ZooKeeperTestHarness {
     assertEquals(None, zkUtils.getLeaderIsrAndEpochForPartition(topic, partition + 1))
   }
 
+  @Test
+  def testGetSequenceIdMethod() {
+    val path = "/test/seqid"
+    (1 to 10).foreach { seqid =>
+      assertEquals(seqid, zkUtils.getSequenceId(path))
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 6280d97..06ea963 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -21,18 +21,15 @@ import java.lang.Iterable
 import javax.security.auth.login.Configuration
 
 import scala.collection.JavaConverters._
-
 import kafka.consumer.ConsumerConfig
-import kafka.utils.ZkUtils
-import kafka.utils.ZKCheckedEphemeral
-import kafka.utils.TestUtils
+import kafka.utils.{CoreUtils, TestUtils, ZKCheckedEphemeral, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.CreateMode
 import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher
 import org.apache.zookeeper.ZooDefs.Ids
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.junit.{After, Before, Test, Assert}
+import org.junit.{After, Assert, Before, Test}
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
 import org.junit.runner.RunWith
@@ -50,7 +47,8 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
   val jaasFile = kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
   val authProvider = "zookeeper.authProvider.1"
   var zkSessionTimeoutMs = 1000
-  
+  var zkUtils: ZkUtils = null
+
   @Before
   override def setUp() {
     if (secure) {
@@ -61,10 +59,13 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
         fail("Secure access not enabled")
     }
     super.setUp
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
   }
   
   @After
   override def tearDown() {
+    if (zkUtils != null)
+     CoreUtils.swallow(zkUtils.close(), this)
     super.tearDown
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     System.clearProperty(authProvider)
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index c7c3152..9e72583 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,7 +19,7 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.junit.{After, AfterClass, Before, BeforeClass}
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
@@ -43,7 +43,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
   protected val zkAclsEnabled: Option[Boolean] = None
 
-  var zkUtils: ZkUtils = null
   var zkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null
 
@@ -55,7 +54,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
-    zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled))
     zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
     adminZkClient = new AdminZkClient(zkClient)
@@ -63,8 +61,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
   @After
   def tearDown() {
-    if (zkUtils != null)
-     CoreUtils.swallow(zkUtils.close(), this)
     if (zkClient != null)
      zkClient.close()
     if (zookeeper != null)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.