You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/26 04:11:19 UTC

[1/3] kafka git commit: MINOR: Rename and change package of async ZooKeeper classes

Repository: kafka
Updated Branches:
  refs/heads/trunk f7f8e1121 -> ab6f848ba


http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 028201f..ca8a0d6 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -68,7 +68,7 @@ class TopicFilterTest extends JUnitSuite {
       topicCount.getTopicCountMap.head._1
     }
     //lets make sure that the JSON strings are escaping as we expect
-    //if they are not then when they get saved to zookeeper and read back out they will be broken on parse
+    //if they are not then when they get saved to ZooKeeper and read back out they will be broken on parse
     assertEquals("-\\\"-", getTopicCountMapKey("-\"-"))
     assertEquals("-\\\\-", getTopicCountMapKey("-\\-"))
     assertEquals("-\\/-", getTopicCountMapKey("-/-"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 296f4a7..81df1e1 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -18,10 +18,12 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper.{CreateResponse, GetDataResponse, ZooKeeperClientException}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
@@ -33,7 +35,7 @@ import scala.collection.mutable
 
 class PartitionStateMachineTest extends JUnitSuite {
   private var controllerContext: ControllerContext = null
-  private var mockZkUtils: KafkaControllerZkUtils = null
+  private var mockZkClient: KafkaZkClient = null
   private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
   private var mockTopicDeletionManager: TopicDeletionManager = null
   private var partitionState: mutable.Map[TopicAndPartition, PartitionState] = null
@@ -49,12 +51,12 @@ class PartitionStateMachineTest extends JUnitSuite {
   def setUp(): Unit = {
     controllerContext = new ControllerContext
     controllerContext.epoch = controllerEpoch
-    mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
+    mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
     mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
     partitionState = mutable.Map.empty[TopicAndPartition, PartitionState]
     partitionStateMachine = new PartitionStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager,
-      mockZkUtils, partitionState, mockControllerBrokerRequestBatch)
+      mockZkClient, partitionState, mockControllerBrokerRequestBatch)
   }
 
   @Test
@@ -82,14 +84,14 @@ class PartitionStateMachineTest extends JUnitSuite {
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
       .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null)))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
 
@@ -100,12 +102,12 @@ class PartitionStateMachineTest extends JUnitSuite {
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
-      .andThrow(new ZookeeperClientException("test"))
+    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+      .andThrow(new ZooKeeperClientException("test"))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(NewPartition, partitionState(partition))
   }
 
@@ -116,12 +118,12 @@ class PartitionStateMachineTest extends JUnitSuite {
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+    EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
       .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), null)))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(NewPartition, partitionState(partition))
   }
 
@@ -150,22 +152,22 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
       Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
 
@@ -182,22 +184,22 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
       Seq(brokerId, otherBrokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
 
@@ -233,23 +235,23 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
-    EasyMock.expect(mockZkUtils.getLogConfigs(Seq.empty, config.originals()))
+    EasyMock.expect(mockZkClient.getLogConfigs(Seq.empty, config.originals()))
       .andReturn((Map(partition.topic -> LogConfig()), Map.empty))
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
-    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlinePartition, partitionState(partition))
   }
 
@@ -263,14 +265,14 @@ class PartitionStateMachineTest extends JUnitSuite {
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
 
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
-      .andThrow(new ZookeeperClientException(""))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
+      .andThrow(new ZooKeeperClientException(""))
 
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OfflinePartition, partitionState(partition))
   }
 
@@ -285,15 +287,15 @@ class PartitionStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.NONODE, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
 
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OfflinePartition, partitionState(partition))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 0afe7c2..6363d41 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -18,9 +18,11 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper.GetDataResponse
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
@@ -32,7 +34,7 @@ import scala.collection.mutable
 
 class ReplicaStateMachineTest extends JUnitSuite {
   private var controllerContext: ControllerContext = null
-  private var mockZkUtils: KafkaControllerZkUtils = null
+  private var mockZkClient: KafkaZkClient = null
   private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
   private var mockTopicDeletionManager: TopicDeletionManager = null
   private var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = null
@@ -50,11 +52,11 @@ class ReplicaStateMachineTest extends JUnitSuite {
   def setUp(): Unit = {
     controllerContext = new ControllerContext
     controllerContext.epoch = controllerEpoch
-    mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
+    mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
     mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
     replicaState = mutable.Map.empty[PartitionAndReplica, ReplicaState]
-    replicaStateMachine = new ReplicaStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager, mockZkUtils,
+    replicaStateMachine = new ReplicaStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager, mockZkClient,
       replicaState, mockControllerBrokerRequestBatch)
   }
 
@@ -155,9 +157,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlineReplica, replicaState(replica))
   }
 
@@ -178,19 +180,19 @@ class ReplicaStateMachineTest extends JUnitSuite {
     val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId))
     val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1)
     val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch)
-    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
       .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
-    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch))
+    EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
       partition.topic, partition.partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
 
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
     replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
     assertEquals(updatedLeaderIsrAndControllerEpoch, controllerContext.partitionLeadershipInfo(partition))
     assertEquals(OfflineReplica, replicaState(replica))
   }
@@ -230,9 +232,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlineReplica, replicaState(replica))
   }
 
@@ -244,9 +246,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, true, callbacks.stopReplicaResponseCallback))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted, callbacks)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(ReplicaDeletionStarted, replicaState(replica))
   }
 
@@ -348,9 +350,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
       partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
-    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
-    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
     assertEquals(OnlineReplica, replicaState(replica))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
deleted file mode 100644
index d7b46c7..0000000
--- a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.controller
-
-import java.net.UnknownHostException
-import java.nio.charset.StandardCharsets
-import java.util.UUID
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import javax.security.auth.login.Configuration
-
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.{CreateMode, ZooDefs}
-import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
-import org.junit.{After, Test}
-
-class ZookeeperClientTest extends ZooKeeperTestHarness {
-  private val mockPath = "/foo"
-
-  @After
-  override def tearDown() {
-    super.tearDown()
-    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
-    Configuration.setConfiguration(null)
-  }
-
-  @Test(expected = classOf[UnknownHostException])
-  def testUnresolvableConnectString(): Unit = {
-    new ZookeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null)
-  }
-
-  @Test(expected = classOf[ZookeeperClientTimeoutException])
-  def testConnectionTimeout(): Unit = {
-    zookeeper.shutdown()
-    new ZookeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null)
-  }
-
-  @Test
-  def testConnection(): Unit = {
-    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-  }
-
-  @Test
-  def testDeleteNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1))
-    assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode)
-  }
-
-  @Test
-  def testDeleteExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
-    val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1))
-    assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode)
-  }
-
-  @Test
-  def testExistsNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath))
-    assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode)
-  }
-
-  @Test
-  def testExistsExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
-    val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath))
-    assertEquals("Response code for exists should be OK", Code.OK, existsResponse.resultCode)
-  }
-
-  @Test
-  def testGetDataNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath))
-    assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode)
-  }
-
-  @Test
-  def testGetDataExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val data = bytes
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
-      CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
-    val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath))
-    assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode)
-    assertArrayEquals("Data for getData should match created znode data", data, getDataResponse.data)
-  }
-
-  @Test
-  def testSetDataNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
-    assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode)
-  }
-
-  @Test
-  def testSetDataExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val data = bytes
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
-    val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, data, -1))
-    assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode)
-    val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath))
-    assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode)
-    assertArrayEquals("Data for getData should match setData's data", data, getDataResponse.data)
-  }
-
-  @Test
-  def testGetAclNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath))
-    assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode)
-  }
-
-  @Test
-  def testGetAclExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
-    val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath))
-    assertEquals("Response code for getAcl should be OK", Code.OK, getAclResponse.resultCode)
-    assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getAclResponse.acl)
-  }
-
-  @Test
-  def testSetAclNonExistentZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val setAclResponse = zookeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
-    assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode)
-  }
-
-  @Test
-  def testGetChildrenNonExistentZNode(): Unit = {
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
-    assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode)
-  }
-
-  @Test
-  def testGetChildrenExistingZNode(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
-    val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
-    assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode)
-    assertEquals("getChildren should return no children", Seq.empty[String], getChildrenResponse.children)
-  }
-
-  @Test
-  def testGetChildrenExistingZNodeWithChildren(): Unit = {
-    import scala.collection.JavaConverters._
-    val child1 = "child1"
-    val child2 = "child2"
-    val child1Path = mockPath + "/" + child1
-    val child2Path = mockPath + "/" + child2
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
-    val createResponseChild1 = zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode)
-    val createResponseChild2 = zookeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create child2 should be OK", Code.OK, createResponseChild2.resultCode)
-
-    val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
-    assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode)
-    assertEquals("getChildren should return two children", Seq(child1, child2), getChildrenResponse.children.sorted)
-  }
-
-  @Test
-  def testPipelinedGetData(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    val createResponses = createRequests.map(zookeeperClient.handleRequest)
-    createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode))
-    val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x))
-    val getDataResponses = zookeeperClient.handleRequests(getDataRequests)
-    getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData should be OK", Code.OK,
-      getDataResponse.resultCode))
-    getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) =>
-      assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode)
-      assertEquals("Data for getData should match", ((i + 1) * 2), Integer.valueOf(new String(getDataResponse.data)))
-    }
-  }
-
-  @Test
-  def testMixedPipeline(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
-      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
-    val getDataRequest = GetDataRequest(mockPath)
-    val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1)
-    val responses = zookeeperClient.handleRequests(Seq(getDataRequest, setDataRequest))
-    assertEquals("Response code for getData should be OK", Code.OK, responses.head.resultCode)
-    assertArrayEquals("Data for getData should be empty", Array.empty[Byte], responses.head.asInstanceOf[GetDataResponse].data)
-    assertEquals("Response code for setData should be NONODE", Code.NONODE, responses.last.resultCode)
-  }
-
-  @Test
-  def testZNodeChangeHandlerForCreation(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation(): Unit = {
-        znodeChangeHandlerCountDownLatch.countDown()
-      }
-      override val path: String = mockPath
-    }
-
-    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsRequest = ExistsRequest(mockPath)
-    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
-    val responses = zookeeperClient.handleRequests(Seq(existsRequest, createRequest))
-    assertEquals("Response code for exists should be NONODE", Code.NONODE, responses.head.resultCode)
-    assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode)
-    assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  @Test
-  def testZNodeChangeHandlerForDeletion(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleDeletion(): Unit = {
-        znodeChangeHandlerCountDownLatch.countDown()
-      }
-      override val path: String = mockPath
-    }
-
-    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsRequest = ExistsRequest(mockPath)
-    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
-    val responses = zookeeperClient.handleRequests(Seq(createRequest, existsRequest))
-    assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode)
-    assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode)
-    val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1))
-    assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode)
-    assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  @Test
-  def testZNodeChangeHandlerForDataChange(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleDataChange(): Unit = {
-        znodeChangeHandlerCountDownLatch.countDown()
-      }
-      override val path: String = mockPath
-    }
-
-    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsRequest = ExistsRequest(mockPath)
-    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
-    val responses = zookeeperClient.handleRequests(Seq(createRequest, existsRequest))
-    assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode)
-    assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode)
-    val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
-    assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode)
-    assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  @Test
-  def testZNodeChildChangeHandlerForChildChange(): Unit = {
-    import scala.collection.JavaConverters._
-    val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
-    val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
-      override def handleChildChange(): Unit = {
-        zNodeChildChangeHandlerCountDownLatch.countDown()
-      }
-      override val path: String = mockPath
-    }
-
-    val child1 = "child1"
-    val child1Path = mockPath + "/" + child1
-    val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
-    zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
-    val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath))
-    assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode)
-    val createResponseChild1 = zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
-    assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode)
-    assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  @Test
-  def testStateChangeHandlerForAuthFailure(): Unit = {
-    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf")
-    val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val stateChangeHandler = new StateChangeHandler {
-      override def onAuthFailure(): Unit = {
-        stateChangeHandlerCountDownLatch.countDown()
-      }
-    }
-    new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
-    assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
-  }
-
-  private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index fb76ca1..cfc325d 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -86,7 +86,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging {
     for(_ <- 0 until numMessages)
       producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
 
-    // update offset in zookeeper for consumer to jump "forward" in time
+    // update offset in ZooKeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)
     val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
     consumerProps.put("auto.offset.reset", resetTo)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index ca17b9a..22a06e7 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -239,7 +239,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
     assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
 
-    //test removing last acl also deletes zookeeper path
+    //test removing last acl also deletes ZooKeeper path
     acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
     changeAclAndVerify(acls, Set.empty[Acl], acls)
     assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
@@ -405,7 +405,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   def testHighConcurrencyDeletionOfResourceAcls() {
     val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
 
-    // Alternate authorizer to keep adding and removing zookeeper path
+    // Alternate authorizer to keep adding and removing ZooKeeper path
     val concurrentFuctions = (0 to 50).map { _ =>
       () => {
         simpleAclAuthorizer.addAcls(Set(acl), resource)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 4196bc1..31d32d2 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -84,7 +84,7 @@ class ClientQuotaManagerTest {
 
   /**
    * Tests parsing for <client-id> quotas.
-   * Quota overrides persisted in Zookeeper in /config/clients/<client-id>, default persisted in /config/clients/<default>
+   * Quota overrides persisted in ZooKeeper in /config/clients/<client-id>, default persisted in /config/clients/<default>
    */
   @Test
   def testClientIdQuotaParsing() {
@@ -97,7 +97,7 @@ class ClientQuotaManagerTest {
 
   /**
    * Tests parsing for <user> quotas.
-   * Quota overrides persisted in Zookeeper in /config/users/<user>, default persisted in /config/users/<default>
+   * Quota overrides persisted in ZooKeeper in /config/users/<user>, default persisted in /config/users/<default>
    */
   @Test
   def testUserQuotaParsing() {
@@ -111,7 +111,7 @@ class ClientQuotaManagerTest {
 
   /**
    * Tests parsing for <user, client-id> quotas.
-   * Quotas persisted in Zookeeper in /config/users/<user>/clients/<client-id>, default in /config/users/<default>/clients/<default>
+   * Quotas persisted in ZooKeeper in /config/users/<user>/clients/<client-id>, default in /config/users/<default>/clients/<default>
    */
   @Test
   def testUserClientIdQuotaParsing() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a86c160..60f403d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package unit.kafka.server
+package kafka.server
 
 import java.lang.{Long => JLong}
 import java.net.InetAddress
@@ -30,7 +30,6 @@ import kafka.log.{Log, TimestampOffset}
 import kafka.network.RequestChannel
 import kafka.security.auth.Authorizer
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server._
 import kafka.utils.{MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnsupportedVersionException

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 8786b19..306dbc0 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -80,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     AdminUtils.createTopic(zkUtils, topic, 1, 1)
 
     val logManager = server.getLogManager
@@ -115,7 +115,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     AdminUtils.createTopic(zkUtils, topic, 1, 1)
 
     val logManager = server.getLogManager
@@ -154,7 +154,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
 
     val topic = topicPartition.split("-").head
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
 
     var offsetChanged = false
@@ -178,7 +178,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager
@@ -207,7 +207,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
-    // setup brokers in zookeeper as owners of partitions for this test
+    // setup brokers in ZooKeeper as owners of partitions for this test
     AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 9c51a10..fa373f5 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -139,7 +139,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     updateProducer()
 
     leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
-    assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0",
+    assertTrue("Leader must remain on broker 1, in case of ZooKeeper session expiration it can move to broker 0",
       leader == 0 || leader == 1)
 
     assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
@@ -150,7 +150,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     server2.startup()
     updateProducer()
     leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(leader))
-    assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1",
+    assertTrue("Leader must remain on broker 0, in case of ZooKeeper session expiration it can move to broker 1",
       leader == 0 || leader == 1)
 
     sendMessages(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2920730..b8d0afb 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -74,7 +74,7 @@ object TestUtils extends Logging {
 
   /** Port to use for unit tests that mock/don't require a real ZK server. */
   val MockZkPort = 1
-  /** Zookeeper connection string to use for unit tests that mock/don't require a real ZK server. */
+  /** ZooKeeper connection string to use for unit tests that mock/don't require a real ZK server. */
   val MockZkConnect = "127.0.0.1:" + MockZkPort
 
   private val transactionStatusKey = "transactionStatus"
@@ -273,7 +273,7 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Create a topic in zookeeper.
+   * Create a topic in ZooKeeper.
    * Wait until the leader is elected and the metadata is propagated to all brokers.
    * Return the leader for each partition.
    */
@@ -293,7 +293,7 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Create a topic in zookeeper using a customized replica assignment.
+   * Create a topic in ZooKeeper using a customized replica assignment.
    * Wait until the leader is elected and the metadata is propagated to all brokers.
    * Return the leader for each partition.
    */
@@ -1172,7 +1172,7 @@ object TestUtils extends Logging {
     TestUtils.waitUntilTrue(() =>
       servers.forall(server => topicPartitions.forall(tp => server.replicaManager.getPartition(tp).isEmpty)),
       "Replica manager's should have deleted all of this topic's partitions")
-    // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
+    // ensure that logs from all replicas are deleted if delete topic is marked successful in ZooKeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.forall(server => topicPartitions.forall(tp => server.getLogManager.getLog(tp).isEmpty)))
     // ensure that topic is removed from all cleaner offsets

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 22465ea..36d477c 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -25,19 +25,19 @@ import java.net.InetSocketAddress
 import kafka.utils.CoreUtils
 import org.apache.kafka.common.utils.Utils
 
-class EmbeddedZookeeper() {
+class EmbeddedZooKeeper() {
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
   val tickTime = 500
-  val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
+  val zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime)
   val factory = new NIOServerCnxnFactory()
   private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort)
   factory.configure(addr, 0)
-  factory.startup(zookeeper)
-  val port = zookeeper.getClientPort()
+  factory.startup(zooKeeperServer)
+  val port = zooKeeperServer.getClientPort()
 
   def shutdown() {
-    CoreUtils.swallow(zookeeper.shutdown())
+    CoreUtils.swallow(zooKeeperServer.shutdown())
     CoreUtils.swallow(factory.shutdown())
 
     def isDown(): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index 07978b9..0cf836d 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -38,7 +38,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     try {
       zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.createPersistentPath(path)
-      fail("Failed to throw ConfigException for missing zookeeper root node")
+      fail("Failed to throw ConfigException for missing ZooKeeper root node")
     } catch {
       case _: ConfigException =>
     }
@@ -62,7 +62,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     try {
       zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.makeSurePersistentPathExists(path)
-      fail("Failed to throw ConfigException for missing zookeeper root node")
+      fail("Failed to throw ConfigException for missing ZooKeeper root node")
     } catch {
       case _: ConfigException =>
     }
@@ -86,7 +86,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     try {
       zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.createEphemeralPathExpectConflict(path, "somedata")
-      fail("Failed to throw ConfigException for missing zookeeper root node")
+      fail("Failed to throw ConfigException for missing ZooKeeper root node")
     } catch {
       case _: ConfigException =>
     }
@@ -111,7 +111,7 @@ class ZKPathTest extends ZooKeeperTestHarness {
     try {
       zkUtils.zkPath.resetNamespaceCheckedState
       zkUtils.createSequentialPersistentPath(path)
-      fail("Failed to throw ConfigException for missing zookeeper root node")
+      fail("Failed to throw ConfigException for missing ZooKeeper root node")
     } catch {
       case _: ConfigException =>
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 0a7e631..6bedba3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -41,14 +41,14 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkUtils: ZkUtils = null
-  var zookeeper: EmbeddedZookeeper = null
+  var zookeeper: EmbeddedZooKeeper = null
 
   def zkPort: Int = zookeeper.port
   def zkConnect: String = s"127.0.0.1:$zkPort"
   
   @Before
   def setUp() {
-    zookeeper = new EmbeddedZookeeper()
+    zookeeper = new EmbeddedZooKeeper()
     zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
new file mode 100644
index 0000000..d595221
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zookeeper
+
+import java.net.UnknownHostException
+import java.nio.charset.StandardCharsets
+import java.util.UUID
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
+
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.{CreateMode, ZooDefs}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.junit.{After, Test}
+
+class ZooKeeperClientTest extends ZooKeeperTestHarness {
+  private val mockPath = "/foo"
+
+  @After
+  override def tearDown() {
+    super.tearDown()
+    System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    Configuration.setConfiguration(null)
+  }
+
+  @Test(expected = classOf[UnknownHostException])
+  def testUnresolvableConnectString(): Unit = {
+    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null)
+  }
+
+  @Test(expected = classOf[ZooKeeperClientTimeoutException])
+  def testConnectionTimeout(): Unit = {
+    zookeeper.shutdown()
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null)
+  }
+
+  @Test
+  def testConnection(): Unit = {
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+  }
+
+  @Test
+  def testDeleteNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
+    assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode)
+  }
+
+  @Test
+  def testDeleteExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+    val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
+    assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode)
+  }
+
+  @Test
+  def testExistsNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode)
+  }
+
+  @Test
+  def testExistsExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+    val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
+    assertEquals("Response code for exists should be OK", Code.OK, existsResponse.resultCode)
+  }
+
+  @Test
+  def testGetDataNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode)
+  }
+
+  @Test
+  def testGetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
+      CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+    val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
+    assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode)
+    assertArrayEquals("Data for getData should match created znode data", data, getDataResponse.data)
+  }
+
+  @Test
+  def testSetDataNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
+    assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode)
+  }
+
+  @Test
+  def testSetDataExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val data = bytes
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+    val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, data, -1))
+    assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode)
+    val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
+    assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode)
+    assertArrayEquals("Data for getData should match setData's data", data, getDataResponse.data)
+  }
+
+  @Test
+  def testGetAclNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode)
+  }
+
+  @Test
+  def testGetAclExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+    val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
+    assertEquals("Response code for getAcl should be OK", Code.OK, getAclResponse.resultCode)
+    assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getAclResponse.acl)
+  }
+
+  @Test
+  def testSetAclNonExistentZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val setAclResponse = zooKeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
+    assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode)
+  }
+
+  @Test
+  def testGetChildrenNonExistentZNode(): Unit = {
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode)
+  }
+
+  @Test
+  def testGetChildrenExistingZNode(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+    val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode)
+    assertEquals("getChildren should return no children", Seq.empty[String], getChildrenResponse.children)
+  }
+
+  @Test
+  def testGetChildrenExistingZNodeWithChildren(): Unit = {
+    import scala.collection.JavaConverters._
+    val child1 = "child1"
+    val child2 = "child2"
+    val child1Path = mockPath + "/" + child1
+    val child2Path = mockPath + "/" + child2
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+    val createResponseChild1 = zooKeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode)
+    val createResponseChild2 = zooKeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create child2 should be OK", Code.OK, createResponseChild2.resultCode)
+
+    val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode)
+    assertEquals("getChildren should return two children", Seq(child1, child2), getChildrenResponse.children.sorted)
+  }
+
+  @Test
+  def testPipelinedGetData(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    val createResponses = createRequests.map(zooKeeperClient.handleRequest)
+    createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode))
+    val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x))
+    val getDataResponses = zooKeeperClient.handleRequests(getDataRequests)
+    getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData should be OK", Code.OK,
+      getDataResponse.resultCode))
+    getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) =>
+      assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode)
+      assertEquals("Data for getData should match", ((i + 1) * 2), Integer.valueOf(new String(getDataResponse.data)))
+    }
+  }
+
+  @Test
+  def testMixedPipeline(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+      ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+    val getDataRequest = GetDataRequest(mockPath)
+    val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1)
+    val responses = zooKeeperClient.handleRequests(Seq(getDataRequest, setDataRequest))
+    assertEquals("Response code for getData should be OK", Code.OK, responses.head.resultCode)
+    assertArrayEquals("Data for getData should be empty", Array.empty[Byte], responses.head.asInstanceOf[GetDataResponse].data)
+    assertEquals("Response code for setData should be NONODE", Code.NONODE, responses.last.resultCode)
+  }
+
+  @Test
+  def testZNodeChangeHandlerForCreation(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
+    val responses = zooKeeperClient.handleRequests(Seq(existsRequest, createRequest))
+    assertEquals("Response code for exists should be NONODE", Code.NONODE, responses.head.resultCode)
+    assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode)
+    assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDeletion(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleDeletion(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
+    val responses = zooKeeperClient.handleRequests(Seq(createRequest, existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode)
+    assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode)
+    val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
+    assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode)
+    assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleDataChange(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)
+    val responses = zooKeeperClient.handleRequests(Seq(createRequest, existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode)
+    assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode)
+    val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
+    assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode)
+    assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testZNodeChildChangeHandlerForChildChange(): Unit = {
+    import scala.collection.JavaConverters._
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
+      override def handleChildChange(): Unit = {
+        zNodeChildChangeHandlerCountDownLatch.countDown()
+      }
+      override val path: String = mockPath
+    }
+
+    val child1 = "child1"
+    val child1Path = mockPath + "/" + child1
+    val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
+    zooKeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+    val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
+    assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode)
+    val createResponseChild1 = zooKeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
+    assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode)
+    assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  @Test
+  def testStateChangeHandlerForAuthFailure(): Unit = {
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf")
+    val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val stateChangeHandler = new StateChangeHandler {
+      override def onAuthFailure(): Unit = {
+        stateChangeHandlerCountDownLatch.countDown()
+      }
+    }
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
+    assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
+  }
+
+  private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 367e489..0145827 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -20,7 +20,7 @@ import kafka.server.KafkaConfig$;
 import kafka.server.KafkaServer;
 import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
+import kafka.zk.EmbeddedZooKeeper;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.security.JaasUtils;
@@ -47,7 +47,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
     private static final int TOPIC_CREATION_TIMEOUT = 30000;
     private static final int TOPIC_DELETION_TIMEOUT = 30000;
-    private EmbeddedZookeeper zookeeper = null;
+    private EmbeddedZooKeeper zookeeper = null;
     private final KafkaEmbedded[] brokers;
     private ZkUtils zkUtils = null;
 
@@ -84,7 +84,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     public void start() throws IOException, InterruptedException {
         log.debug("Initiating embedded Kafka cluster startup");
         log.debug("Starting a ZooKeeper instance");
-        zookeeper = new EmbeddedZookeeper();
+        zookeeper = new EmbeddedZooKeeper();
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
 
         zkUtils = ZkUtils.apply(


[2/3] kafka git commit: MINOR: Rename and change package of async ZooKeeper classes

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
deleted file mode 100644
index 0009439..0000000
--- a/core/src/main/scala/kafka/controller/ZookeeperClient.scala
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.controller
-
-import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
-import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
-
-import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
-import kafka.utils.Logging
-import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback}
-import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
-import org.apache.zookeeper.ZooKeeper.States
-import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher, ZooKeeper}
-
-import scala.collection.JavaConverters._
-
-/**
- * ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper.
- *
- * @param connectString comma separated host:port pairs, each corresponding to a zk server
- * @param sessionTimeoutMs session timeout in milliseconds
- * @param connectionTimeoutMs connection timeout in milliseconds
- * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
- */
-class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int,
-                      stateChangeHandler: StateChangeHandler) extends Logging {
-  this.logIdent = "[ZookeeperClient] "
-  private val initializationLock = new ReentrantReadWriteLock()
-  private val isConnectedOrExpiredLock = new ReentrantLock()
-  private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
-  private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
-  private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
-
-  info(s"Initializing a new session to $connectString.")
-  @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
-  waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
-
-  /**
-   * Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details.
-   *
-   * @param request a single request to send and wait on.
-   * @return an instance of the response with the specific type (e.g. CreateRequest -> CreateResponse).
-   */
-  def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = {
-    handleRequests(Seq(request)).head
-  }
-
-  /**
-   * Send a pipelined sequence of requests and wait for all of their responses.
-   *
-   * The watch flag on each outgoing request will be set if we've already registered a handler for the
-   * path associated with the request.
-   *
-   * @param requests a sequence of requests to send and wait on.
-   * @return the responses for the requests. If all requests have the same type, the responses will have the respective
-   * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype
-   * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
-   */
-  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) {
-    if (requests.isEmpty)
-      Seq.empty
-    else {
-      val countDownLatch = new CountDownLatch(requests.size)
-      val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
-
-      requests.foreach { request =>
-        send(request) { response =>
-          responseQueue.add(response)
-          countDownLatch.countDown()
-        }
-      }
-      countDownLatch.await()
-      responseQueue.asScala.toBuffer
-    }
-  }
-
-  private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = {
-    // Safe to cast as we always create a response of the right type
-    def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response])
-
-    request match {
-      case ExistsRequest(path, ctx) =>
-        zooKeeper.exists(path, shouldWatch(request), new StatCallback {
-          override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
-            callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat))
-        }, ctx.orNull)
-      case GetDataRequest(path, ctx) =>
-        zooKeeper.getData(path, shouldWatch(request), new DataCallback {
-          override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
-            callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat))
-        }, ctx.orNull)
-      case GetChildrenRequest(path, ctx) =>
-        zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
-          override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat): Unit =
-            callback(GetChildrenResponse(Code.get(rc), path, Option(ctx),
-              Option(children).map(_.asScala).getOrElse(Seq.empty), stat))
-        }, ctx.orNull)
-      case CreateRequest(path, data, acl, createMode, ctx) =>
-        zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
-          override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit =
-            callback(CreateResponse(Code.get(rc), path, Option(ctx), name))
-        }, ctx.orNull)
-      case SetDataRequest(path, data, version, ctx) =>
-        zooKeeper.setData(path, data, version, new StatCallback {
-          override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
-            callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat))
-        }, ctx.orNull)
-      case DeleteRequest(path, version, ctx) =>
-        zooKeeper.delete(path, version, new VoidCallback {
-          override def processResult(rc: Int, path: String, ctx: Any): Unit =
-            callback(DeleteResponse(Code.get(rc), path, Option(ctx)))
-        }, ctx.orNull)
-      case GetAclRequest(path, ctx) =>
-        zooKeeper.getACL(path, null, new ACLCallback {
-          override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = {
-            callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty),
-              stat))
-        }}, ctx.orNull)
-      case SetAclRequest(path, acl, version, ctx) =>
-        zooKeeper.setACL(path, acl.asJava, version, new StatCallback {
-          override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
-            callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat))
-        }, ctx.orNull)
-    }
-  }
-
-  /**
-   * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
-   * @throws ZookeeperClientAuthFailedException if the authentication failed either before or while waiting for connection.
-   * @throws ZookeeperClientExpiredException if the session expired either before or while waiting for connection.
-   */
-  def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
-    waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
-  }
-
-  private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = {
-    info("Waiting until connected.")
-    var nanos = timeUnit.toNanos(timeout)
-    inLock(isConnectedOrExpiredLock) {
-      var state = zooKeeper.getState
-      while (!state.isConnected && state.isAlive) {
-        if (nanos <= 0) {
-          throw new ZookeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state")
-        }
-        nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
-        state = zooKeeper.getState
-      }
-      if (state == States.AUTH_FAILED) {
-        throw new ZookeeperClientAuthFailedException("Auth failed either before or while waiting for connection")
-      } else if (state == States.CLOSED) {
-        throw new ZookeeperClientExpiredException("Session expired either before or while waiting for connection")
-      }
-    }
-    info("Connected.")
-  }
-
-  // If this method is changed, the documentation for registerZNodeChangeHandler and/or registerZNodeChildChangeHandler
-  // may need to be updated.
-  private def shouldWatch(request: AsyncRequest): Boolean = request match {
-    case _: GetChildrenRequest => zNodeChildChangeHandlers.contains(request.path)
-    case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path)
-    case _ => throw new IllegalArgumentException(s"Request $request is not watchable")
-  }
-
-  /**
-   * Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher.
-   *
-   * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest])
-   * with either a GetDataRequest or ExistsRequest.
-   *
-   * NOTE: zookeeper only allows registration to a nonexistent znode with ExistsRequest.
-   *
-   * @param zNodeChangeHandler the handler to register
-   */
-  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
-    zNodeChangeHandlers.put(zNodeChangeHandler.path, zNodeChangeHandler)
-  }
-
-  /**
-   * Unregister the handler from ZookeeperClient. This is just a local operation.
-   * @param path the path of the handler to unregister
-   */
-  def unregisterZNodeChangeHandler(path: String): Unit = {
-    zNodeChangeHandlers.remove(path)
-  }
-
-  /**
-   * Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher.
-   *
-   * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) with a GetChildrenRequest.
-   *
-   * @param zNodeChildChangeHandler the handler to register
-   */
-  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
-    zNodeChildChangeHandlers.put(zNodeChildChangeHandler.path, zNodeChildChangeHandler)
-  }
-
-  /**
-   * Unregister the handler from ZookeeperClient. This is just a local operation.
-   * @param path the path of the handler to unregister
-   */
-  def unregisterZNodeChildChangeHandler(path: String): Unit = {
-    zNodeChildChangeHandlers.remove(path)
-  }
-
-  def close(): Unit = inWriteLock(initializationLock) {
-    info("Closing.")
-    zNodeChangeHandlers.clear()
-    zNodeChildChangeHandlers.clear()
-    zooKeeper.close()
-    info("Closed.")
-  }
-
-  def sessionId: Long = inReadLock(initializationLock) {
-    zooKeeper.getSessionId
-  }
-
-  private def initialize(): Unit = {
-    if (!zooKeeper.getState.isAlive) {
-      info(s"Initializing a new session to $connectString.")
-      var now = System.currentTimeMillis()
-      val threshold = now + connectionTimeoutMs
-      while (now < threshold) {
-        try {
-          zooKeeper.close()
-          zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZookeeperClientWatcher)
-          waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
-          return
-        } catch {
-          case _: Exception =>
-            now = System.currentTimeMillis()
-            if (now < threshold) {
-              Thread.sleep(1000)
-              now = System.currentTimeMillis()
-            }
-        }
-      }
-      info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}")
-      stateChangeHandler.onReconnectionTimeout()
-    }
-  }
-
-  private object ZookeeperClientWatcher extends Watcher {
-    override def process(event: WatchedEvent): Unit = {
-      debug("Received event: " + event)
-      Option(event.getPath) match {
-        case None =>
-          inLock(isConnectedOrExpiredLock) {
-            isConnectedOrExpiredCondition.signalAll()
-          }
-          if (event.getState == KeeperState.AuthFailed) {
-            info("Auth failed.")
-            stateChangeHandler.onAuthFailure()
-          } else if (event.getState == KeeperState.Expired) {
-            inWriteLock(initializationLock) {
-              info("Session expired.")
-              stateChangeHandler.beforeInitializingSession()
-              initialize()
-              stateChangeHandler.afterInitializingSession()
-            }
-          }
-        case Some(path) =>
-          (event.getType: @unchecked) match {
-            case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
-            case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation())
-            case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
-            case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
-          }
-      }
-    }
-  }
-}
-
-trait StateChangeHandler {
-  def beforeInitializingSession(): Unit = {}
-  def afterInitializingSession(): Unit = {}
-  def onAuthFailure(): Unit = {}
-  def onReconnectionTimeout(): Unit = {}
-}
-
-trait ZNodeChangeHandler {
-  val path: String
-  def handleCreation(): Unit = {}
-  def handleDeletion(): Unit = {}
-  def handleDataChange(): Unit = {}
-}
-
-trait ZNodeChildChangeHandler {
-  val path: String
-  def handleChildChange(): Unit = {}
-}
-
-sealed trait AsyncRequest {
-  /**
-   * This type member allows us to define methods that take requests and return responses with the correct types.
-   * See ``ZookeeperClient.handleRequests`` for example.
-   */
-  type Response <: AsyncResponse
-  def path: String
-  def ctx: Option[Any]
-}
-
-case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode,
-                         ctx: Option[Any] = None) extends AsyncRequest {
-  type Response = CreateResponse
-}
-
-case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest {
-  type Response = DeleteResponse
-}
-
-case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
-  type Response = ExistsResponse
-}
-
-case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
-  type Response = GetDataResponse
-}
-
-case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
-  type Response = SetDataResponse
-}
-
-case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
-  type Response = GetAclResponse
-}
-
-case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
-  type Response = SetAclResponse
-}
-
-case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
-  type Response = GetChildrenResponse
-}
-
-sealed trait AsyncResponse {
-  def resultCode: Code
-  def path: String
-  def ctx: Option[Any]
-
-  /** Return None if the result code is OK and KeeperException otherwise. */
-  def resultException: Option[KeeperException] =
-    if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path))
-}
-case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String) extends AsyncResponse
-case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse
-case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
-case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat) extends AsyncResponse
-case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
-case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat) extends AsyncResponse
-case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
-case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat) extends AsyncResponse
-
-class ZookeeperClientException(message: String) extends RuntimeException(message)
-class ZookeeperClientExpiredException(message: String) extends ZookeeperClientException(message)
-class ZookeeperClientAuthFailedException(message: String) extends ZookeeperClientException(message)
-class ZookeeperClientTimeoutException(message: String) extends ZookeeperClientException(message)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index bd7f023..f1e2fc2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,14 +23,14 @@ import java.util.concurrent._
 
 import com.yammer.metrics.core.Gauge
 import kafka.common.KafkaException
-import kafka.controller.KafkaControllerZkUtils
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
 import kafka.utils._
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.errors.{LogDirNotFoundException, KafkaStorageException}
+import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
 
 import scala.collection.JavaConverters._
 import scala.collection._
@@ -102,7 +102,6 @@ class LogManager(logDirs: Seq[File],
 
   loadLogs()
 
-
   // public, so we can access this from kafka.admin.DeleteTopicTest
   val cleaner: LogCleaner =
     if(cleanerConfig.enableCleaner)
@@ -888,7 +887,7 @@ object LogManager {
 
   def apply(config: KafkaConfig,
             initialOfflineDirs: Seq[String],
-            zkUtils: KafkaControllerZkUtils,
+            zkClient: KafkaZkClient,
             brokerState: BrokerState,
             kafkaScheduler: KafkaScheduler,
             time: Time,
@@ -897,7 +896,7 @@ object LogManager {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
-    val (topicConfigs, failed) = zkUtils.getLogConfigs(zkUtils.getAllTopicsInCluster, defaultProps)
+    val (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
     if (!failed.isEmpty) throw failed.head._2
 
     // read the log configurations from zookeeper

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 101e646..d576206 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -27,7 +27,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_9_0
 import kafka.cluster.Broker
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
-import kafka.controller.{KafkaController, KafkaControllerZkUtils, StateChangeHandler, ZookeeperClient}
+import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{LogConfig, LogManager}
@@ -36,6 +36,8 @@ import kafka.network.SocketServer
 import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
+import kafka.zk.KafkaZkClient
+import kafka.zookeeper.{StateChangeHandler, ZooKeeperClient}
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
@@ -135,7 +137,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   var quotaManagers: QuotaFactory.QuotaManagers = null
 
   var zkUtils: ZkUtils = null
-  var kafkaControllerZkUtils: KafkaControllerZkUtils = null
+  private var zkClient: KafkaZkClient = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
   val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
@@ -219,7 +221,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
-        val zookeeperClient = new ZookeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
+        val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
           config.zkConnectionTimeoutMs, new StateChangeHandler {
             override def onReconnectionTimeout(): Unit = {
               error("Reconnection timeout.")
@@ -233,10 +235,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
             override def beforeInitializingSession(): Unit = kafkaController.expire()
           })
-        kafkaControllerZkUtils = new KafkaControllerZkUtils(zookeeperClient, zkUtils.isSecure)
+        zkClient = new KafkaZkClient(zooKeeperClient, zkUtils.isSecure)
 
         /* start log manager */
-        logManager = LogManager(config, initialOfflineDirs, kafkaControllerZkUtils, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
+        logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
         logManager.startup()
 
         metadataCache = new MetadataCache(config.brokerId)
@@ -250,7 +252,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, kafkaControllerZkUtils, time, metrics, threadNamePrefix)
+        kafkaController = new KafkaController(config, zkClient, time, metrics, threadNamePrefix)
         kafkaController.startup()
 
         adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
@@ -561,8 +563,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           CoreUtils.swallow(kafkaController.shutdown())
         if (zkUtils != null)
           CoreUtils.swallow(zkUtils.close())
-        if (kafkaControllerZkUtils != null)
-          CoreUtils.swallow(kafkaControllerZkUtils.close())
+        if (zkClient != null)
+          CoreUtils.swallow(zkClient.close())
 
         if (metrics != null)
           CoreUtils.swallow(metrics.close())

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/utils/Json.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala
index a916875..ad40c49 100644
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ b/core/src/main/scala/kafka/utils/Json.scala
@@ -16,9 +16,12 @@
  */
 package kafka.utils
 
+import java.nio.charset.StandardCharsets
+
 import com.fasterxml.jackson.core.JsonProcessingException
 import com.fasterxml.jackson.databind.ObjectMapper
 import kafka.utils.json.JsonValue
+
 import scala.collection._
 
 /**
@@ -36,6 +39,13 @@ object Json {
     catch { case _: JsonProcessingException => None }
 
   /**
+   * Parse a JSON byte array into a JsonValue if possible. `None` is returned if `input` is not valid JSON.
+   */
+  def parseBytes(input: Array[Byte]): Option[JsonValue] =
+    try Option(mapper.readTree(input)).map(JsonValue(_))
+    catch { case _: JsonProcessingException => None }
+
+  /**
    * Encode an object into a JSON string. This method accepts any type T where
    *   T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
    * Any other type will result in an exception.
@@ -59,4 +69,13 @@ object Json {
     }
   }
 
+  /**
+   * Encode an object into a JSON value in bytes. This method accepts any type T where
+   *   T => null | Boolean | String | Number | Map[String, T] | Array[T] | Iterable[T]
+   * Any other type will result in an exception.
+   *
+   * This method does not properly handle non-ascii characters.
+   */
+  def encodeAsBytes(obj: Any): Array[Byte] = encode(obj).getBytes(StandardCharsets.UTF_8)
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index cc38667..60c2adf 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -252,6 +252,9 @@ class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time)
   }
 }
 
+/**
+ * Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead.
+ */
 class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
               val zkConnection: ZkConnection,
               val isSecure: Boolean) extends Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
new file mode 100644
index 0000000..0e48d51
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import java.util.Properties
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.log.LogConfig
+import kafka.server.ConfigType
+import kafka.utils._
+import kafka.zookeeper._
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.{CreateMode, KeeperException}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
+ *
+ * This performs better than [[kafka.utils.ZkUtils]] and should replace it completely, eventually.
+ *
+ * Implementation note: this class includes methods for various components (Controller, Configs, Old Consumer, etc.)
+ * and returns instances of classes from the calling packages in some cases. This is not ideal, but it makes it
+ * easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are
+ * in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go.
+ */
+class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging {
+  import KafkaZkClient._
+
+  /**
+   * Gets topic partition states for the given partitions.
+   * @param partitions the partitions for which we want ot get states.
+   * @return sequence of GetDataResponses whose contexts are the partitions they are associated with.
+   */
+  def getTopicPartitionStatesRaw(partitions: Seq[TopicAndPartition]): Seq[GetDataResponse] = {
+    val getDataRequests = partitions.map { partition =>
+      GetDataRequest(TopicPartitionStateZNode.path(partition), ctx = Some(partition))
+    }
+    retryRequestsUntilConnected(getDataRequests)
+  }
+
+  /**
+   * Sets topic partition states for the given partitions.
+   * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
+   * @return sequence of SetDataResponse whose contexts are the partitions they are associated with.
+   */
+  def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
+    val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      val path = TopicPartitionStateZNode.path(partition)
+      val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
+      SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition))
+    }
+    retryRequestsUntilConnected(setDataRequests.toSeq)
+  }
+
+  /**
+   * Creates topic partition state znodes for the given partitions.
+   * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
+   * @return sequence of CreateResponse whose contexts are the partitions they are associated with.
+   */
+  def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
+    createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq)
+    createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq)
+    val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      val path = TopicPartitionStateZNode.path(partition)
+      val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
+      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition))
+    }
+    retryRequestsUntilConnected(createRequests.toSeq)
+  }
+
+  /**
+   * Sets the controller epoch conditioned on the given epochZkVersion.
+   * @param epoch the epoch to set
+   * @param epochZkVersion the expected version number of the epoch znode.
+   * @return SetDataResponse
+   */
+  def setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse = {
+    val setDataRequest = SetDataRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), epochZkVersion)
+    retryRequestUntilConnected(setDataRequest)
+  }
+
+  /**
+   * Creates the controller epoch znode.
+   * @param epoch the epoch to set
+   * @return CreateResponse
+   */
+  def createControllerEpochRaw(epoch: Int): CreateResponse = {
+    val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch),
+      acls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
+    retryRequestUntilConnected(createRequest)
+  }
+
+  /**
+   * Try to update the partition states of multiple partitions in zookeeper.
+   * @param leaderAndIsrs The partition states to update.
+   * @param controllerEpoch The current controller epoch.
+   * @return UpdateLeaderAndIsrResult instance containing per partition results.
+   */
+  def updateLeaderAndIsr(leaderAndIsrs: Map[TopicAndPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = {
+    val successfulUpdates = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
+    val updatesToRetry = mutable.Buffer.empty[TopicAndPartition]
+    val failed = mutable.Map.empty[TopicAndPartition, Exception]
+    val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) }
+    val setDataResponses = try {
+      setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+    } catch {
+      case e: Exception =>
+        leaderAndIsrs.keys.foreach(partition => failed.put(partition, e))
+        return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
+    }
+    setDataResponses.foreach { setDataResponse =>
+      val partition = setDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+      if (setDataResponse.resultCode == Code.OK) {
+        val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
+        successfulUpdates.put(partition, updatedLeaderAndIsr)
+      } else if (setDataResponse.resultCode == Code.BADVERSION) {
+        updatesToRetry += partition
+      } else {
+        failed.put(partition, setDataResponse.resultException.get)
+      }
+    }
+    UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
+  }
+
+  /**
+   * Get log configs that merge local configs with topic-level configs in zookeeper.
+   * @param topics The topics to get log configs for.
+   * @param config The local configs.
+   * @return A tuple of two values:
+   *         1. The successfully gathered log configs
+   *         2. Exceptions corresponding to failed log config lookups.
+   */
+  def getLogConfigs(topics: Seq[String], config: java.util.Map[String, AnyRef]):
+  (Map[String, LogConfig], Map[String, Exception]) = {
+    val logConfigs = mutable.Map.empty[String, LogConfig]
+    val failed = mutable.Map.empty[String, Exception]
+    val configResponses = try {
+      getTopicConfigs(topics)
+    } catch {
+      case e: Exception =>
+        topics.foreach(topic => failed.put(topic, e))
+        return (logConfigs.toMap, failed.toMap)
+    }
+    configResponses.foreach { configResponse =>
+      val topic = configResponse.ctx.get.asInstanceOf[String]
+      if (configResponse.resultCode == Code.OK) {
+        val overrides = ConfigEntityZNode.decode(configResponse.data)
+        val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
+        logConfigs.put(topic, logConfig)
+      } else if (configResponse.resultCode == Code.NONODE) {
+        val logConfig = LogConfig.fromProps(config, new Properties)
+        logConfigs.put(topic, logConfig)
+      } else {
+        failed.put(topic, configResponse.resultException.get)
+      }
+    }
+    (logConfigs.toMap, failed.toMap)
+  }
+
+  /**
+   * Gets all brokers in the cluster.
+   * @return sequence of brokers in the cluster.
+   */
+  def getAllBrokersInCluster: Seq[Broker] = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      val brokerIds = getChildrenResponse.children.map(_.toInt)
+      val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
+      val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+      getDataResponses.flatMap { getDataResponse =>
+        val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
+        if (getDataResponse.resultCode == Code.OK) {
+          Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+        } else if (getDataResponse.resultCode == Code.NONODE) {
+          None
+        } else {
+          throw getDataResponse.resultException.get
+        }
+      }
+    } else if (getChildrenResponse.resultCode == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+
+  /**
+   * Gets all topics in the cluster.
+   * @return sequence of topics in the cluster.
+   */
+  def getAllTopicsInCluster: Seq[String] = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      getChildrenResponse.children
+    } else if (getChildrenResponse.resultCode == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+
+  /**
+   * Sets the topic znode with the given assignment.
+   * @param topic the topic whose assignment is being set.
+   * @param assignment the partition to replica mapping to set for the given topic
+   * @return SetDataResponse
+   */
+  def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
+    val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), -1)
+    retryRequestUntilConnected(setDataRequest)
+  }
+
+  /**
+   * Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path.
+   * @return sequence of znode names and not the absolute znode path.
+   */
+  def getAllLogDirEventNotifications: Seq[String] = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)
+    } else if (getChildrenResponse.resultCode == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+
+  /**
+   * Reads each of the log dir event notifications associated with the given sequence numbers and extracts the broker ids.
+   * @param sequenceNumbers the sequence numbers associated with the log dir event notifications.
+   * @return broker ids associated with the given log dir event notifications.
+   */
+  def getBrokerIdsFromLogDirEvents(sequenceNumbers: Seq[String]): Seq[Int] = {
+    val getDataRequests = sequenceNumbers.map { sequenceNumber =>
+      GetDataRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber))
+    }
+    val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+    getDataResponses.flatMap { getDataResponse =>
+      if (getDataResponse.resultCode == Code.OK) {
+        LogDirEventNotificationSequenceZNode.decode(getDataResponse.data)
+      } else if (getDataResponse.resultCode == Code.NONODE) {
+        None
+      } else {
+        throw getDataResponse.resultException.get
+      }
+    }
+  }
+
+  /**
+   * Deletes all log dir event notifications.
+   */
+  def deleteLogDirEventNotifications(): Unit = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      deleteLogDirEventNotifications(getChildrenResponse.children)
+    } else if (getChildrenResponse.resultCode != Code.NONODE) {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+
+  /**
+   * Deletes the log dir event notifications associated with the given sequence numbers.
+   * @param sequenceNumbers the sequence numbers associated with the log dir event notifications to be deleted.
+   */
+  def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
+    val deleteRequests = sequenceNumbers.map { sequenceNumber =>
+      DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), -1)
+    }
+    retryRequestsUntilConnected(deleteRequests)
+  }
+
+  /**
+   * Gets the assignments for the given topics.
+   * @param topics the topics whose partitions we wish to get the assignments for.
+   * @return the replica assignment for each partition from the given topics.
+   */
+  def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicAndPartition, Seq[Int]] = {
+    val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
+    val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
+    getDataResponses.flatMap { getDataResponse =>
+      val topic = getDataResponse.ctx.get.asInstanceOf[String]
+      if (getDataResponse.resultCode == Code.OK) {
+        TopicZNode.decode(topic, getDataResponse.data)
+      } else if (getDataResponse.resultCode == Code.NONODE) {
+        Map.empty[TopicAndPartition, Seq[Int]]
+      } else {
+        throw getDataResponse.resultException.get
+      }
+    }.toMap
+  }
+
+  /**
+   * Get all topics marked for deletion.
+   * @return sequence of topics marked for deletion.
+   */
+  def getTopicDeletions: Seq[String] = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(DeleteTopicsZNode.path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      getChildrenResponse.children
+    } else if (getChildrenResponse.resultCode == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+
+  /**
+   * Remove the given topics from the topics marked for deletion.
+   * @param topics the topics to remove.
+   */
+  def deleteTopicDeletions(topics: Seq[String]): Unit = {
+    val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), -1))
+    retryRequestsUntilConnected(deleteRequests)
+  }
+
+  /**
+   * Returns all reassignments.
+   * @return the reassignments for each partition.
+   */
+  def getPartitionReassignment: Map[TopicAndPartition, Seq[Int]] = {
+    val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    if (getDataResponse.resultCode == Code.OK) {
+      ReassignPartitionsZNode.decode(getDataResponse.data)
+    } else if (getDataResponse.resultCode == Code.NONODE) {
+      Map.empty[TopicAndPartition, Seq[Int]]
+    } else {
+      throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Sets the partition reassignment znode with the given reassignment.
+   * @param reassignment the reassignment to set on the reassignment znode.
+   * @return SetDataResponse
+   */
+  def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
+    val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), -1)
+    retryRequestUntilConnected(setDataRequest)
+  }
+
+  /**
+   * Creates the partition reassignment znode with the given reassignment.
+   * @param reassignment the reassignment to set on the reassignment znode.
+   * @return CreateResponse
+   */
+  def createPartitionReassignment(reassignment: Map[TopicAndPartition, Seq[Int]]): CreateResponse = {
+    val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
+      acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
+    retryRequestUntilConnected(createRequest)
+  }
+
+  /**
+   * Deletes the partition reassignment znode.
+   */
+  def deletePartitionReassignment(): Unit = {
+    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, -1)
+    retryRequestUntilConnected(deleteRequest)
+  }
+
+  /**
+   * Gets topic partition states for the given partitions.
+   * @param partitions the partitions for which we want ot get states.
+   * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
+   */
+  def getTopicPartitionStates(partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+    val getDataResponses = getTopicPartitionStatesRaw(partitions)
+    getDataResponses.flatMap { getDataResponse =>
+      val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+      if (getDataResponse.resultCode == Code.OK) {
+        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _)
+      } else if (getDataResponse.resultCode == Code.NONODE) {
+        None
+      } else {
+        throw getDataResponse.resultException.get
+      }
+    }.toMap
+  }
+
+  /**
+   * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
+   * @return sequence of znode names and not the absolute znode path.
+   */
+  def getAllIsrChangeNotifications: Seq[String] = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber)
+    } else if (getChildrenResponse.resultCode == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+
+  /**
+   * Reads each of the isr change notifications associated with the given sequence numbers and extracts the partitions.
+   * @param sequenceNumbers the sequence numbers associated with the isr change notifications.
+   * @return partitions associated with the given isr change notifications.
+   */
+  def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicAndPartition] = {
+    val getDataRequests = sequenceNumbers.map { sequenceNumber =>
+      GetDataRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber))
+    }
+    val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+    getDataResponses.flatMap { getDataResponse =>
+      if (getDataResponse.resultCode == Code.OK) {
+        IsrChangeNotificationSequenceZNode.decode(getDataResponse.data)
+      } else if (getDataResponse.resultCode == Code.NONODE) {
+        None
+      } else {
+        throw getDataResponse.resultException.get
+      }
+    }
+  }
+
+  /**
+   * Deletes all isr change notifications.
+   */
+  def deleteIsrChangeNotifications(): Unit = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      deleteIsrChangeNotifications(getChildrenResponse.children)
+    } else if (getChildrenResponse.resultCode != Code.NONODE) {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+
+  /**
+   * Deletes the isr change notifications associated with the given sequence numbers.
+   * @param sequenceNumbers the sequence numbers associated with the isr change notifications to be deleted.
+   */
+  def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
+    val deleteRequests = sequenceNumbers.map { sequenceNumber =>
+      DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), -1)
+    }
+    retryRequestsUntilConnected(deleteRequests)
+  }
+
+  /**
+   * Gets the partitions marked for preferred replica election.
+   * @return sequence of partitions.
+   */
+  def getPreferredReplicaElection: Set[TopicAndPartition] = {
+    val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    if (getDataResponse.resultCode == Code.OK) {
+      PreferredReplicaElectionZNode.decode(getDataResponse.data)
+    } else if (getDataResponse.resultCode == Code.NONODE) {
+      Set.empty[TopicAndPartition]
+    } else {
+      throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Deletes the preferred replica election znode.
+   */
+  def deletePreferredReplicaElection(): Unit = {
+    val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, -1)
+    retryRequestUntilConnected(deleteRequest)
+  }
+
+  /**
+   * Gets the controller id.
+   * @return optional integer that is Some if the controller znode exists and can be parsed and None otherwise.
+   */
+  def getControllerId: Option[Int] = {
+    val getDataRequest = GetDataRequest(ControllerZNode.path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    if (getDataResponse.resultCode == Code.OK) {
+      ControllerZNode.decode(getDataResponse.data)
+    } else if (getDataResponse.resultCode == Code.NONODE) {
+      None
+    } else {
+      throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Deletes the controller znode.
+   */
+  def deleteController(): Unit = {
+    val deleteRequest = DeleteRequest(ControllerZNode.path, -1)
+    retryRequestUntilConnected(deleteRequest)
+  }
+
+  /**
+   * Gets the controller epoch.
+   * @return optional (Int, Stat) that is Some if the controller epoch path exists and None otherwise.
+   */
+  def getControllerEpoch: Option[(Int, Stat)] = {
+    val getDataRequest = GetDataRequest(ControllerEpochZNode.path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    if (getDataResponse.resultCode == Code.OK) {
+      val epoch = ControllerEpochZNode.decode(getDataResponse.data)
+      Option(epoch, getDataResponse.stat)
+    } else if (getDataResponse.resultCode == Code.NONODE) {
+      None
+    } else {
+      throw getDataResponse.resultException.get
+    }
+  }
+
+  /**
+   * Recursively deletes the topic znode.
+   * @param topic the topic whose topic znode we wish to delete.
+   */
+  def deleteTopicZNode(topic: String): Unit = {
+    deleteRecursive(TopicZNode.path(topic))
+  }
+
+  /**
+   * Deletes the topic configs for the given topics.
+   * @param topics the topics whose configs we wish to delete.
+   */
+  def deleteTopicConfigs(topics: Seq[String]): Unit = {
+    val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), -1))
+    retryRequestsUntilConnected(deleteRequests)
+  }
+
+  /**
+   * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data watcher
+   * registrations on paths which might not even exist.
+   *
+   * @param zNodeChangeHandler
+   */
+  def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
+    zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsResponse = retryRequestUntilConnected(ExistsRequest(zNodeChangeHandler.path))
+    if (existsResponse.resultCode != Code.OK && existsResponse.resultCode != Code.NONODE) {
+      throw existsResponse.resultException.get
+    }
+  }
+
+  /**
+   * See ZooKeeperClient.registerZNodeChangeHandler
+   * @param zNodeChangeHandler
+   */
+  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
+    zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+  }
+
+  /**
+   * See ZooKeeperClient.unregisterZNodeChangeHandler
+   * @param path
+   */
+  def unregisterZNodeChangeHandler(path: String): Unit = {
+    zooKeeperClient.unregisterZNodeChangeHandler(path)
+  }
+
+  /**
+   * See ZooKeeperClient.registerZNodeChildChangeHandler
+   * @param zNodeChildChangeHandler
+   */
+  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
+    zooKeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+  }
+
+  /**
+   * See ZooKeeperClient.unregisterZNodeChildChangeHandler
+   * @param path
+   */
+  def unregisterZNodeChildChangeHandler(path: String): Unit = {
+    zooKeeperClient.unregisterZNodeChildChangeHandler(path)
+  }
+
+  /**
+   * Close the underlying ZooKeeperClient.
+   */
+  def close(): Unit = {
+    zooKeeperClient.close()
+  }
+
+  private def deleteRecursive(path: String): Unit = {
+    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
+    if (getChildrenResponse.resultCode == Code.OK) {
+      getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
+      val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, -1))
+      if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
+        throw deleteResponse.resultException.get
+      }
+    } else if (getChildrenResponse.resultCode != Code.NONODE) {
+      throw getChildrenResponse.resultException.get
+    }
+  }
+  private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
+    val createRequests = partitions.map { partition =>
+      val path = TopicPartitionZNode.path(partition)
+      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition))
+    }
+    retryRequestsUntilConnected(createRequests)
+  }
+
+  private def createTopicPartitions(topics: Seq[String]) = {
+    val createRequests = topics.map { topic =>
+      val path = TopicPartitionsZNode.path(topic)
+      CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic))
+    }
+    retryRequestsUntilConnected(createRequests)
+  }
+
+  private def getTopicConfigs(topics: Seq[String]) = {
+    val getDataRequests = topics.map { topic =>
+      GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic))
+    }
+    retryRequestsUntilConnected(getDataRequests)
+  }
+
+  private def acls(path: String) = {
+    import scala.collection.JavaConverters._
+    ZkUtils.defaultAcls(isSecure, path).asScala
+  }
+
+  private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
+    retryRequestsUntilConnected(Seq(request)).head
+  }
+
+  private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
+    val remainingRequests = ArrayBuffer(requests: _*)
+    val responses = new ArrayBuffer[Req#Response]
+    while (remainingRequests.nonEmpty) {
+      val batchResponses = zooKeeperClient.handleRequests(remainingRequests)
+
+      // Only execute slow path if we find a response with CONNECTIONLOSS
+      if (batchResponses.exists(_.resultCode == Code.CONNECTIONLOSS)) {
+        val requestResponsePairs = remainingRequests.zip(batchResponses)
+
+        remainingRequests.clear()
+        requestResponsePairs.foreach { case (request, response) =>
+          if (response.resultCode == Code.CONNECTIONLOSS)
+            remainingRequests += request
+          else
+            responses += response
+        }
+
+        if (remainingRequests.nonEmpty)
+          zooKeeperClient.waitUntilConnected()
+      } else {
+        remainingRequests.clear()
+        responses ++= batchResponses
+      }
+    }
+    responses
+  }
+
+  def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
+    val checkedEphemeral = new CheckedEphemeral(path, data)
+    info(s"Creating $path (is it secure? $isSecure)")
+    val code = checkedEphemeral.create()
+    info(s"Result of znode creation at $path is: $code")
+    code match {
+      case Code.OK =>
+      case _ => throw KeeperException.create(code)
+    }
+  }
+
+  private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    def create(): Code = {
+      val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
+      val createResponse = retryRequestUntilConnected(createRequest)
+      val code = createResponse.resultCode
+      if (code == Code.OK) {
+        code
+      } else if (code == Code.NODEEXISTS) {
+        get()
+      } else {
+        error(s"Error while creating ephemeral at $path with return code: $code")
+        code
+      }
+    }
+
+    private def get(): Code = {
+      val getDataRequest = GetDataRequest(path)
+      val getDataResponse = retryRequestUntilConnected(getDataRequest)
+      val code = getDataResponse.resultCode
+      if (code == Code.OK) {
+        if (getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId) {
+          error(s"Error while creating ephemeral at $path with return code: $code")
+          Code.NODEEXISTS
+        } else {
+          code
+        }
+      } else if (code == Code.NONODE) {
+        info(s"The ephemeral node at $path went away while reading it")
+        create()
+      } else {
+        error(s"Error while creating ephemeral at $path with return code: $code")
+        code
+      }
+    }
+  }
+}
+
+object KafkaZkClient {
+
+  /**
+   * @param successfulPartitions The successfully updated partition states with adjusted znode versions.
+   * @param partitionsToRetry The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts
+   *                      can occur if the partition leader updated partition state while the controller attempted to
+   *                      update partition state.
+   * @param failedPartitions Exceptions corresponding to failed partition state updates.
+   */
+  case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicAndPartition, LeaderAndIsr],
+                                      partitionsToRetry: Seq[TopicAndPartition],
+                                      failedPartitions: Map[TopicAndPartition, Exception])
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
new file mode 100644
index 0000000..292523c
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Properties
+
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
+import kafka.cluster.{Broker, EndPoint}
+import kafka.common.TopicAndPartition
+import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
+import kafka.utils.Json
+import org.apache.zookeeper.data.Stat
+
+import scala.collection.Seq
+
+// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
+
+object ControllerZNode {
+  def path = "/controller"
+  def encode(brokerId: Int, timestamp: Long): Array[Byte] =
+    Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
+  def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
+    js.asJsonObject("brokerid").to[Int]
+  }
+}
+
+object ControllerEpochZNode {
+  def path = "/controller_epoch"
+  def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8)
+  def decode(bytes: Array[Byte]): Int = new String(bytes, UTF_8).toInt
+}
+
+object ConfigZNode {
+  def path = "/config"
+}
+
+object BrokersZNode {
+  def path = "/brokers"
+}
+
+object BrokerIdsZNode {
+  def path = s"${BrokersZNode.path}/ids"
+  def encode: Array[Byte] = null
+}
+
+object BrokerIdZNode {
+  def path(id: Int) = s"${BrokerIdsZNode.path}/$id"
+  def encode(id: Int,
+             host: String,
+             port: Int,
+             advertisedEndpoints: Seq[EndPoint],
+             jmxPort: Int,
+             rack: Option[String],
+             apiVersion: ApiVersion): Array[Byte] = {
+    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+    Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes(UTF_8)
+  }
+
+  def decode(id: Int, bytes: Array[Byte]): Broker = {
+    Broker.createBroker(id, new String(bytes, UTF_8))
+  }
+}
+
+object TopicsZNode {
+  def path = s"${BrokersZNode.path}/topics"
+}
+
+object TopicZNode {
+  def path(topic: String) = s"${TopicsZNode.path}/$topic"
+  def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
+    val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas }
+    Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson))
+  }
+  def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = {
+    Json.parseBytes(bytes).flatMap { js =>
+      val assignmentJson = js.asJsonObject
+      val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject)
+      partitionsJsonOpt.map { partitionsJson =>
+        partitionsJson.iterator.map { case (partition, replicas) =>
+          TopicAndPartition(topic, partition.toInt) -> replicas.to[Seq[Int]]
+        }
+      }
+    }.map(_.toMap).getOrElse(Map.empty)
+  }
+}
+
+object TopicPartitionsZNode {
+  def path(topic: String) = s"${TopicZNode.path(topic)}/partitions"
+}
+
+object TopicPartitionZNode {
+  def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
+}
+
+object TopicPartitionStateZNode {
+  def path(partition: TopicAndPartition) = s"${TopicPartitionZNode.path(partition)}/state"
+  def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
+    val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+    val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+    Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
+      "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
+  }
+  def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
+    Json.parseBytes(bytes).map { js =>
+      val leaderIsrAndEpochInfo = js.asJsonObject
+      val leader = leaderIsrAndEpochInfo("leader").to[Int]
+      val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
+      val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
+      val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
+      val zkPathVersion = stat.getVersion
+      LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)
+    }
+  }
+}
+
+object ConfigEntityTypeZNode {
+  def path(entityType: String) = s"${ConfigZNode.path}/$entityType"
+}
+
+object ConfigEntityZNode {
+  def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName"
+  def encode(config: Properties): Array[Byte] = {
+    import scala.collection.JavaConverters._
+    Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala))
+  }
+  def decode(bytes: Array[Byte]): Option[Properties] = {
+    Json.parseBytes(bytes).map { js =>
+      val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
+      val props = new Properties()
+      configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
+      props
+    }
+  }
+}
+
+object IsrChangeNotificationZNode {
+  def path = "/isr_change_notification"
+}
+
+object IsrChangeNotificationSequenceZNode {
+  val SequenceNumberPrefix = "isr_change_"
+  def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
+  def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
+    val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition))
+    Json.encodeAsBytes(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson))
+  }
+
+  def decode(bytes: Array[Byte]): Set[TopicAndPartition] = {
+    Json.parseBytes(bytes).map { js =>
+      val partitionsJson = js.asJsonObject("partitions").asJsonArray
+      partitionsJson.iterator.map { partitionsJson =>
+        val partitionJson = partitionsJson.asJsonObject
+        val topic = partitionJson("topic").to[String]
+        val partition = partitionJson("partition").to[Int]
+        TopicAndPartition(topic, partition)
+      }
+    }
+  }.map(_.toSet).getOrElse(Set.empty)
+  def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
+}
+
+object LogDirEventNotificationZNode {
+  def path = "/log_dir_event_notification"
+}
+
+object LogDirEventNotificationSequenceZNode {
+  val SequenceNumberPrefix = "log_dir_event_"
+  val LogDirFailureEvent = 1
+  def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
+  def encode(brokerId: Int) =
+    Json.encodeAsBytes(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent))
+  def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
+    js.asJsonObject("broker").to[Int]
+  }
+  def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
+}
+
+object AdminZNode {
+  def path = "/admin"
+}
+
+object DeleteTopicsZNode {
+  def path = s"${AdminZNode.path}/delete_topics"
+}
+
+object DeleteTopicsTopicZNode {
+  def path(topic: String) = s"${DeleteTopicsZNode.path}/$topic"
+}
+
+object ReassignPartitionsZNode {
+  def path = s"${AdminZNode.path}/reassign_partitions"
+  def encode(reassignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
+    val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) =>
+      Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas)
+    }
+    Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson))
+  }
+  def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js =>
+    val reassignmentJson = js.asJsonObject
+    val partitionsJsonOpt = reassignmentJson.get("partitions")
+    partitionsJsonOpt.map { partitionsJson =>
+      partitionsJson.asJsonArray.iterator.map { partitionFieldsJs =>
+        val partitionFields = partitionFieldsJs.asJsonObject
+        val topic = partitionFields("topic").to[String]
+        val partition = partitionFields("partition").to[Int]
+        val replicas = partitionFields("replicas").to[Seq[Int]]
+        TopicAndPartition(topic, partition) -> replicas
+      }
+    }
+  }.map(_.toMap).getOrElse(Map.empty)
+}
+
+object PreferredReplicaElectionZNode {
+  def path = s"${AdminZNode.path}/preferred_replica_election"
+  def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
+    val jsonMap = Map("version" -> 1,
+      "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))
+    Json.encodeAsBytes(jsonMap)
+  }
+  def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseBytes(bytes).map { js =>
+    val partitionsJson = js.asJsonObject("partitions").asJsonArray
+    partitionsJson.iterator.map { partitionsJson =>
+      val partitionJson = partitionsJson.asJsonObject
+      val topic = partitionJson("topic").to[String]
+      val partition = partitionJson("partition").to[Int]
+      TopicAndPartition(topic, partition)
+    }
+  }.map(_.toSet).getOrElse(Set.empty)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
new file mode 100644
index 0000000..0ff34c0
--- /dev/null
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zookeeper
+
+import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
+
+import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
+import kafka.utils.Logging
+import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback}
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
+import org.apache.zookeeper.ZooKeeper.States
+import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher, ZooKeeper}
+
+import scala.collection.JavaConverters._
+
+/**
+ * A ZooKeeper client that encourages pipelined requests.
+ *
+ * @param connectString comma separated host:port pairs, each corresponding to a zk server
+ * @param sessionTimeoutMs session timeout in milliseconds
+ * @param connectionTimeoutMs connection timeout in milliseconds
+ * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
+ */
+class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int,
+                      stateChangeHandler: StateChangeHandler) extends Logging {
+  this.logIdent = "[ZooKeeperClient] "
+  private val initializationLock = new ReentrantReadWriteLock()
+  private val isConnectedOrExpiredLock = new ReentrantLock()
+  private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
+  private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
+  private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
+
+  info(s"Initializing a new session to $connectString.")
+  @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
+  waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
+
+  /**
+   * Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details.
+   *
+   * @param request a single request to send and wait on.
+   * @return an instance of the response with the specific type (e.g. CreateRequest -> CreateResponse).
+   */
+  def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = {
+    handleRequests(Seq(request)).head
+  }
+
+  /**
+   * Send a pipelined sequence of requests and wait for all of their responses.
+   *
+   * The watch flag on each outgoing request will be set if we've already registered a handler for the
+   * path associated with the request.
+   *
+   * @param requests a sequence of requests to send and wait on.
+   * @return the responses for the requests. If all requests have the same type, the responses will have the respective
+   * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype
+   * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]).
+   */
+  def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = inReadLock(initializationLock) {
+    if (requests.isEmpty)
+      Seq.empty
+    else {
+      val countDownLatch = new CountDownLatch(requests.size)
+      val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
+
+      requests.foreach { request =>
+        send(request) { response =>
+          responseQueue.add(response)
+          countDownLatch.countDown()
+        }
+      }
+      countDownLatch.await()
+      responseQueue.asScala.toBuffer
+    }
+  }
+
+  private def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = {
+    // Safe to cast as we always create a response of the right type
+    def callback(response: AsyncResponse): Unit = processResponse(response.asInstanceOf[Req#Response])
+
+    request match {
+      case ExistsRequest(path, ctx) =>
+        zooKeeper.exists(path, shouldWatch(request), new StatCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
+            callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat))
+        }, ctx.orNull)
+      case GetDataRequest(path, ctx) =>
+        zooKeeper.getData(path, shouldWatch(request), new DataCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit =
+            callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat))
+        }, ctx.orNull)
+      case GetChildrenRequest(path, ctx) =>
+        zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback {
+          override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat): Unit =
+            callback(GetChildrenResponse(Code.get(rc), path, Option(ctx),
+              Option(children).map(_.asScala).getOrElse(Seq.empty), stat))
+        }, ctx.orNull)
+      case CreateRequest(path, data, acl, createMode, ctx) =>
+        zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit =
+            callback(CreateResponse(Code.get(rc), path, Option(ctx), name))
+        }, ctx.orNull)
+      case SetDataRequest(path, data, version, ctx) =>
+        zooKeeper.setData(path, data, version, new StatCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
+            callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat))
+        }, ctx.orNull)
+      case DeleteRequest(path, version, ctx) =>
+        zooKeeper.delete(path, version, new VoidCallback {
+          override def processResult(rc: Int, path: String, ctx: Any): Unit =
+            callback(DeleteResponse(Code.get(rc), path, Option(ctx)))
+        }, ctx.orNull)
+      case GetAclRequest(path, ctx) =>
+        zooKeeper.getACL(path, null, new ACLCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = {
+            callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty),
+              stat))
+        }}, ctx.orNull)
+      case SetAclRequest(path, acl, version, ctx) =>
+        zooKeeper.setACL(path, acl.asJava, version, new StatCallback {
+          override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit =
+            callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat))
+        }, ctx.orNull)
+    }
+  }
+
+  /**
+   * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
+   * @throws ZooKeeperClientAuthFailedException if the authentication failed either before or while waiting for connection.
+   * @throws ZooKeeperClientExpiredException if the session expired either before or while waiting for connection.
+   */
+  def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
+    waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
+  }
+
+  private def waitUntilConnected(timeout: Long, timeUnit: TimeUnit): Unit = {
+    info("Waiting until connected.")
+    var nanos = timeUnit.toNanos(timeout)
+    inLock(isConnectedOrExpiredLock) {
+      var state = zooKeeper.getState
+      while (!state.isConnected && state.isAlive) {
+        if (nanos <= 0) {
+          throw new ZooKeeperClientTimeoutException(s"Timed out waiting for connection while in state: $state")
+        }
+        nanos = isConnectedOrExpiredCondition.awaitNanos(nanos)
+        state = zooKeeper.getState
+      }
+      if (state == States.AUTH_FAILED) {
+        throw new ZooKeeperClientAuthFailedException("Auth failed either before or while waiting for connection")
+      } else if (state == States.CLOSED) {
+        throw new ZooKeeperClientExpiredException("Session expired either before or while waiting for connection")
+      }
+    }
+    info("Connected.")
+  }
+
+  // If this method is changed, the documentation for registerZNodeChangeHandler and/or registerZNodeChildChangeHandler
+  // may need to be updated.
+  private def shouldWatch(request: AsyncRequest): Boolean = request match {
+    case _: GetChildrenRequest => zNodeChildChangeHandlers.contains(request.path)
+    case _: ExistsRequest | _: GetDataRequest => zNodeChangeHandlers.contains(request.path)
+    case _ => throw new IllegalArgumentException(s"Request $request is not watchable")
+  }
+
+  /**
+   * Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher.
+   *
+   * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest])
+   * with either a GetDataRequest or ExistsRequest.
+   *
+   * NOTE: zookeeper only allows registration to a nonexistent znode with ExistsRequest.
+   *
+   * @param zNodeChangeHandler the handler to register
+   */
+  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
+    zNodeChangeHandlers.put(zNodeChangeHandler.path, zNodeChangeHandler)
+  }
+
+  /**
+   * Unregister the handler from ZooKeeperClient. This is just a local operation.
+   * @param path the path of the handler to unregister
+   */
+  def unregisterZNodeChangeHandler(path: String): Unit = {
+    zNodeChangeHandlers.remove(path)
+  }
+
+  /**
+   * Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher.
+   *
+   * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) with a GetChildrenRequest.
+   *
+   * @param zNodeChildChangeHandler the handler to register
+   */
+  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
+    zNodeChildChangeHandlers.put(zNodeChildChangeHandler.path, zNodeChildChangeHandler)
+  }
+
+  /**
+   * Unregister the handler from ZooKeeperClient. This is just a local operation.
+   * @param path the path of the handler to unregister
+   */
+  def unregisterZNodeChildChangeHandler(path: String): Unit = {
+    zNodeChildChangeHandlers.remove(path)
+  }
+
+  def close(): Unit = inWriteLock(initializationLock) {
+    info("Closing.")
+    zNodeChangeHandlers.clear()
+    zNodeChildChangeHandlers.clear()
+    zooKeeper.close()
+    info("Closed.")
+  }
+
+  def sessionId: Long = inReadLock(initializationLock) {
+    zooKeeper.getSessionId
+  }
+
+  private def initialize(): Unit = {
+    if (!zooKeeper.getState.isAlive) {
+      info(s"Initializing a new session to $connectString.")
+      var now = System.currentTimeMillis()
+      val threshold = now + connectionTimeoutMs
+      while (now < threshold) {
+        try {
+          zooKeeper.close()
+          zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
+          waitUntilConnected(threshold - now, TimeUnit.MILLISECONDS)
+          return
+        } catch {
+          case _: Exception =>
+            now = System.currentTimeMillis()
+            if (now < threshold) {
+              Thread.sleep(1000)
+              now = System.currentTimeMillis()
+            }
+        }
+      }
+      info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}")
+      stateChangeHandler.onReconnectionTimeout()
+    }
+  }
+
+  private object ZooKeeperClientWatcher extends Watcher {
+    override def process(event: WatchedEvent): Unit = {
+      debug("Received event: " + event)
+      Option(event.getPath) match {
+        case None =>
+          inLock(isConnectedOrExpiredLock) {
+            isConnectedOrExpiredCondition.signalAll()
+          }
+          if (event.getState == KeeperState.AuthFailed) {
+            info("Auth failed.")
+            stateChangeHandler.onAuthFailure()
+          } else if (event.getState == KeeperState.Expired) {
+            inWriteLock(initializationLock) {
+              info("Session expired.")
+              stateChangeHandler.beforeInitializingSession()
+              initialize()
+              stateChangeHandler.afterInitializingSession()
+            }
+          }
+        case Some(path) =>
+          (event.getType: @unchecked) match {
+            case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange())
+            case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation())
+            case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion())
+            case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange())
+          }
+      }
+    }
+  }
+}
+
+trait StateChangeHandler {
+  def beforeInitializingSession(): Unit = {}
+  def afterInitializingSession(): Unit = {}
+  def onAuthFailure(): Unit = {}
+  def onReconnectionTimeout(): Unit = {}
+}
+
+trait ZNodeChangeHandler {
+  val path: String
+  def handleCreation(): Unit = {}
+  def handleDeletion(): Unit = {}
+  def handleDataChange(): Unit = {}
+}
+
+trait ZNodeChildChangeHandler {
+  val path: String
+  def handleChildChange(): Unit = {}
+}
+
+sealed trait AsyncRequest {
+  /**
+   * This type member allows us to define methods that take requests and return responses with the correct types.
+   * See ``ZooKeeperClient.handleRequests`` for example.
+   */
+  type Response <: AsyncResponse
+  def path: String
+  def ctx: Option[Any]
+}
+
+case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode,
+                         ctx: Option[Any] = None) extends AsyncRequest {
+  type Response = CreateResponse
+}
+
+case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest {
+  type Response = DeleteResponse
+}
+
+case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+  type Response = ExistsResponse
+}
+
+case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+  type Response = GetDataResponse
+}
+
+case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
+  type Response = SetDataResponse
+}
+
+case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+  type Response = GetAclResponse
+}
+
+case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest {
+  type Response = SetAclResponse
+}
+
+case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest {
+  type Response = GetChildrenResponse
+}
+
+sealed trait AsyncResponse {
+  def resultCode: Code
+  def path: String
+  def ctx: Option[Any]
+
+  /** Return None if the result code is OK and KeeperException otherwise. */
+  def resultException: Option[KeeperException] =
+    if (resultCode == Code.OK) None else Some(KeeperException.create(resultCode, path))
+}
+case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String) extends AsyncResponse
+case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse
+case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
+case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat) extends AsyncResponse
+case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
+case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat) extends AsyncResponse
+case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse
+case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat) extends AsyncResponse
+
+class ZooKeeperClientException(message: String) extends RuntimeException(message)
+class ZooKeeperClientExpiredException(message: String) extends ZooKeeperClientException(message)
+class ZooKeeperClientAuthFailedException(message: String) extends ZooKeeperClientException(message)
+class ZooKeeperClientTimeoutException(message: String) extends ZooKeeperClientException(message)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index ea306a8..99ddcc3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -47,7 +47,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
 
   /**
    * Checks that everyone can access ZkUtils.SecureZkRootPaths and ZkUtils.SensitiveZkRootPaths
-   * when zookeeper.set.acl=false, even if Zookeeper is SASL-enabled.
+   * when zookeeper.set.acl=false, even if ZooKeeper is SASL-enabled.
    */
   @Test
   def testZkAclsDisabled() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 1ca5500..06ddd66 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -221,7 +221,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     } catch {
       case _: UnknownTopicOrPartitionException => // expected exception
     }
-    // verify delete topic path for test2 is removed from zookeeper
+    // verify delete topic path for test2 is removed from ZooKeeper
     TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
     // verify that topic test is untouched
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index a646ced..7fc2436 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -53,7 +53,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
 
     /*
      * There is no easy way to test purging. Even if we mock kafka time with MockTime, the purging compares kafka time
-     * with the time stored in zookeeper stat and the embedded zookeeper server does not provide a way to mock time.
+     * with the time stored in ZooKeeper stat and the embedded ZooKeeper server does not provide a way to mock time.
      * So to test purging we would have to use Time.SYSTEM.sleep(changeExpirationMs + 1) issue a write and check
      * Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size). However even that the assertion
      * can fail as the second node can be deleted depending on how threads get scheduled.


[3/3] kafka git commit: MINOR: Rename and change package of async ZooKeeper classes

Posted by ju...@apache.org.
MINOR: Rename and change package of async ZooKeeper classes

- kafka.controller.ZookeeperClient -> kafka.zookeeper.ZooKeeperClient
- kafka.controller.ControllerZkUtils -> kafka.zk.KafkaZkClient
- kafka.controller.ZkData -> kafka.zk.ZkData
- Renamed various fields to match new names and for consistency
- A few clean-ups in ZkData
- Document intent

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Onur Karaman <ok...@linkedin.com>, Manikumar Reddy <ma...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #4112 from ijuma/rename-zookeeper-client-and-move-to-zookeper-package


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ab6f848b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ab6f848b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ab6f848b

Branch: refs/heads/trunk
Commit: ab6f848ba6cafaed3d75b54005c954733f0d1735
Parents: f7f8e11
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Oct 25 21:11:16 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 25 21:11:16 2017 -0700

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      |   4 +-
 .../kafka/controller/KafkaController.scala      | 144 ++--
 .../controller/KafkaControllerZkUtils.scala     | 716 ------------------
 .../controller/PartitionStateMachine.scala      |  13 +-
 .../kafka/controller/ReplicaStateMachine.scala  |   9 +-
 .../kafka/controller/TopicDeletionManager.scala |  11 +-
 .../main/scala/kafka/controller/ZkData.scala    | 248 -------
 .../kafka/controller/ZookeeperClient.scala      | 374 ----------
 core/src/main/scala/kafka/log/LogManager.scala  |   9 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  18 +-
 core/src/main/scala/kafka/utils/Json.scala      |  19 +
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   3 +
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 726 +++++++++++++++++++
 core/src/main/scala/kafka/zk/ZkData.scala       | 244 +++++++
 .../scala/kafka/zookeeper/ZooKeeperClient.scala | 374 ++++++++++
 .../api/SaslPlainPlaintextConsumerTest.scala    |   2 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |   2 +-
 .../ZkNodeChangeNotificationListenerTest.scala  |   2 +-
 .../unit/kafka/consumer/TopicFilterTest.scala   |   2 +-
 .../controller/PartitionStateMachineTest.scala  |  70 +-
 .../controller/ReplicaStateMachineTest.scala    |  34 +-
 .../kafka/controller/ZookeeperClientTest.scala  | 339 ---------
 .../kafka/integration/AutoOffsetResetTest.scala |   2 +-
 .../security/auth/SimpleAclAuthorizerTest.scala |   4 +-
 .../kafka/server/ClientQuotaManagerTest.scala   |   6 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |   3 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |  10 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   8 +-
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala |  10 +-
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   |   8 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |   4 +-
 .../kafka/zookeeper/ZooKeeperClientTest.scala   | 339 +++++++++
 .../integration/utils/EmbeddedKafkaCluster.java |   6 +-
 34 files changed, 1902 insertions(+), 1865 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index b4ee1fd..0e34c5a 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -153,11 +153,11 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
     }
 
     override def handleSessionEstablishmentError(error: Throwable) {
-      fatal("Could not establish session with zookeeper", error)
+      fatal("Could not establish session with ZooKeeper", error)
     }
 
     override def handleStateChanged(state: KeeperState) {
-      debug(s"New zookeeper state: ${state}")
+      debug(s"New ZooKeeper state: ${state}")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 1df40b3..d3e6998 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -22,10 +22,12 @@ import com.yammer.metrics.core.Gauge
 import kafka.admin.AdminOperationException
 import kafka.api._
 import kafka.common._
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server._
 import kafka.utils._
+import kafka.zk._
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper.{ZNodeChangeHandler, ZNodeChildChangeHandler}
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -41,7 +43,7 @@ object KafkaController extends Logging {
   val InitialControllerEpochZkVersion = 1
 }
 
-class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   this.logIdent = s"[Controller id=${config.brokerId}] "
 
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
@@ -55,10 +57,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
   private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
     _ => updateMetrics())
 
-  val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkUtils)
+  val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
-  val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
-  val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+  val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+  val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
 
   private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
   private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
@@ -155,23 +157,23 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
    * This ensures another controller election will be triggered and there will always be an actively serving controller
    */
   def onControllerFailover() {
-    info("Reading controller epoch from zookeeper")
-    readControllerEpochFromZookeeper()
-    info("Incrementing controller epoch in zookeeper")
+    info("Reading controller epoch from ZooKeeper")
+    readControllerEpochFromZooKeeper()
+    info("Incrementing controller epoch in ZooKeeper")
     incrementControllerEpoch()
     info("Registering handlers")
 
     // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
     val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
       isrChangeNotificationHandler)
-    childChangeHandlers.foreach(zkUtils.registerZNodeChildChangeHandler)
+    childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
     val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
-    nodeChangeHandlers.foreach(zkUtils.registerZNodeChangeHandlerAndCheckExistence)
+    nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
 
     info("Deleting log dir event notifications")
-    zkUtils.deleteLogDirEventNotifications()
+    zkClient.deleteLogDirEventNotifications()
     info("Deleting isr change notifications")
-    zkUtils.deleteIsrChangeNotifications()
+    zkClient.deleteIsrChangeNotifications()
     info("Initializing controller context")
     initializeControllerContext()
     info("Fetching topic deletions in progress")
@@ -213,10 +215,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
   def onControllerResignation() {
     debug("Resigning")
     // de-register listeners
-    zkUtils.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
-    zkUtils.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
-    zkUtils.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
-    zkUtils.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
+    zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
+    zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
+    zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
+    zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
 
     // reset topic deletion manager
     topicDeletionManager.reset()
@@ -232,12 +234,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
     unregisterPartitionReassignmentIsrChangeHandlers()
     // shutdown partition state machine
     partitionStateMachine.shutdown()
-    zkUtils.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
+    zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
     unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
-    zkUtils.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
+    zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
     // shutdown replica state machine
     replicaStateMachine.shutdown()
-    zkUtils.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
+    zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
 
     resetControllerContext()
 
@@ -465,7 +467,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
     val partitionReassignmentIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition)
     reassignedPartitionContext.partitionReassignmentIsrChangeHandler = partitionReassignmentIsrChangeHandler
     // register listener on the leader and isr path to wait until they catch up with the current leader
-    zkUtils.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
+    zkClient.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
   }
 
   def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@@ -536,7 +538,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
   def incrementControllerEpoch(): Unit = {
     val newControllerEpoch = controllerContext.epoch + 1
-    val setDataResponse = zkUtils.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
+    val setDataResponse = zkClient.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
     setDataResponse.resultCode match {
       case Code.OK =>
         controllerContext.epochZkVersion = setDataResponse.stat.getVersion
@@ -545,7 +547,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
         // if path doesn't exist, this is the first controller whose epoch should be 1
         // the following call can still fail if another controller gets elected between checking if the path exists and
         // trying to create the controller epoch path
-        val createResponse = zkUtils.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
+        val createResponse = zkClient.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
         createResponse.resultCode match {
           case Code.OK =>
             controllerContext.epoch = KafkaController.InitialControllerEpoch
@@ -565,10 +567,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
   private def initializeControllerContext() {
     // update controller cache with delete topic information
-    controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster.toSet
-    controllerContext.allTopics = zkUtils.getAllTopicsInCluster.toSet
+    controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
+    controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
     registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
-    controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
+    controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
     controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
     // update the leader and isr cache for all existing partitions from Zookeeper
@@ -582,7 +584,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
   }
 
   private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = {
-    val partitionsUndergoingPreferredReplicaElection = zkUtils.getPreferredReplicaElection
+    val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
     // check if they are already completed or topic was deleted
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
       val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
@@ -618,7 +620,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
   private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
-    val partitionsBeingReassigned = zkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+    val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
     // check if they are already completed or topic was deleted
     val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
       val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
@@ -637,7 +639,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
   }
 
   private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
-    val topicsToBeDeleted = zkUtils.getTopicDeletions.toSet
+    val topicsToBeDeleted = zkClient.getTopicDeletions.toSet
     val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) =>
       replicas.exists(r => !controllerContext.isReplicaOnline(r, partition))
     }.keySet.map(_.topic)
@@ -661,14 +663,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
   }
 
   def updateLeaderAndIsrCache(partitions: Seq[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
-    val leaderIsrAndControllerEpochs = zkUtils.getTopicPartitionStates(partitions)
+    val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
     leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
       controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
     }
   }
 
   private def areReplicasInIsr(partition: TopicAndPartition, replicas: Seq[Int]): Boolean = {
-    zkUtils.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
+    zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
       replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
     }
   }
@@ -720,7 +722,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
                                                  replicas: Seq[Int]) {
     val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic)
     partitionsAndReplicasForThisTopic.put(partition, replicas)
-    val setDataResponse = zkUtils.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
+    val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
     if (setDataResponse.resultCode == Code.OK) {
       info("Updated assigned replicas for partition %s being reassigned to %s ".format(partition, replicas.mkString(",")))
       // update the assigned replica list after a successful zookeeper write
@@ -769,13 +771,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
       val partitionModificationsHandler = new PartitionModificationsHandler(this, eventManager, topic)
       partitionModificationsHandlers.put(topic, partitionModificationsHandler)
     }
-    partitionModificationsHandlers.values.foreach(zkUtils.registerZNodeChangeHandler)
+    partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
   }
 
   def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
     topics.foreach { topic =>
       partitionModificationsHandlers.remove(topic)
-        .foreach(handler => zkUtils.unregisterZNodeChangeHandler(handler.path))
+        .foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
     }
   }
 
@@ -784,13 +786,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
       case (topicAndPartition, reassignedPartitionsContext) =>
         val partitionReassignmentIsrChangeHandler =
           reassignedPartitionsContext.partitionReassignmentIsrChangeHandler
-        zkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
+        zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
     }
   }
 
-  private def readControllerEpochFromZookeeper() {
+  private def readControllerEpochFromZooKeeper() {
     // initialize the controller epoch and zk version by reading from zookeeper
-    val epochAndStatOpt = zkUtils.getControllerEpoch
+    val epochAndStatOpt = zkClient.getControllerEpoch
     epochAndStatOpt.foreach { case (epoch, stat) =>
       controllerContext.epoch = epoch
       controllerContext.epochZkVersion = stat.getVersion
@@ -803,21 +805,21 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
       // stop watching the ISR changes for this partition
       val partitionReassignmentIsrChangeHandler =
         controllerContext.partitionsBeingReassigned(topicAndPartition).partitionReassignmentIsrChangeHandler
-      zkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
+      zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
     }
     // read the current list of reassigned partitions from zookeeper
-    val partitionsBeingReassigned = zkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+    val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
     // remove this partition from that list
     val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
     // write the new list to zookeeper
     val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
     if (reassignment.isEmpty) {
       info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path))
-      zkUtils.deletePartitionReassignment()
+      zkClient.deletePartitionReassignment()
     } else {
-      val setDataResponse = zkUtils.setPartitionReassignmentRaw(reassignment)
+      val setDataResponse = zkClient.setPartitionReassignmentRaw(reassignment)
       if (setDataResponse.resultCode == Code.NONODE) {
-        val createDataResponse = zkUtils.createPartitionReassignment(reassignment)
+        val createDataResponse = zkClient.createPartitionReassignment(reassignment)
         createDataResponse.resultException.foreach(e => throw new AdminOperationException(e))
       } else {
         setDataResponse.resultException.foreach(e => throw new AdminOperationException(e))
@@ -840,7 +842,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
       }
     }
     if (!isTriggeredByAutoRebalance)
-      zkUtils.deletePreferredReplicaElection()
+      zkClient.deletePreferredReplicaElection()
   }
 
   /**
@@ -872,7 +874,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      zkWriteCompleteOrUnnecessary = zkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+      zkWriteCompleteOrUnnecessary = zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
         case Some(leaderIsrAndControllerEpoch) =>
           val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
           val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@@ -885,7 +887,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
           val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
           // update the new leadership decision in zookeeper or retry
           val UpdateLeaderAndIsrResult(successfulUpdates, _, failedUpdates) =
-            zkUtils.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
+            zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
           if (successfulUpdates.contains(partition)) {
             val finalLeaderAndIsr = successfulUpdates(partition)
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
@@ -1065,7 +1067,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
     def state = ControllerState.ControllerChange
 
     override def process(): Unit = {
-      zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+      zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
       elect()
     }
 
@@ -1111,7 +1113,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
   private def triggerControllerMove(): Unit = {
     onControllerResignation()
     activeControllerId = -1
-    zkUtils.deleteController()
+    zkClient.deleteController()
   }
 
   def expire(): Unit = {
@@ -1126,7 +1128,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
   def elect(): Unit = {
     val timestamp = time.milliseconds
-    activeControllerId = zkUtils.getControllerId.getOrElse(-1)
+    activeControllerId = zkClient.getControllerId.getOrElse(-1)
     /*
      * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
      * it's possible that the controller has already been elected when we get here. This check will prevent the following
@@ -1138,14 +1140,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
     }
 
     try {
-      zkUtils.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
+      zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
       info(config.brokerId + " successfully elected as the controller")
       activeControllerId = config.brokerId
       onControllerFailover()
     } catch {
       case _: NodeExistsException =>
         // If someone else has written the path, then
-        activeControllerId = zkUtils.getControllerId.getOrElse(-1)
+        activeControllerId = zkClient.getControllerId.getOrElse(-1)
 
         if (activeControllerId != -1)
           debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
@@ -1163,7 +1165,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       if (!isActive) return
-      val curBrokers = zkUtils.getAllBrokersInCluster.toSet
+      val curBrokers = zkClient.getAllBrokersInCluster.toSet
       val curBrokerIds = curBrokers.map(_.id)
       val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
       val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
@@ -1189,13 +1191,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       if (!isActive) return
-      val topics = zkUtils.getAllTopicsInCluster.toSet
+      val topics = zkClient.getAllTopicsInCluster.toSet
       val newTopics = topics -- controllerContext.allTopics
       val deletedTopics = controllerContext.allTopics -- topics
       controllerContext.allTopics = topics
 
       registerPartitionModificationsHandlers(newTopics.toSeq)
-      val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics)
+      val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
       controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
         !deletedTopics.contains(p._1.topic))
       controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
@@ -1211,13 +1213,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       if (!isActive) return
-      val sequenceNumbers = zkUtils.getAllLogDirEventNotifications
+      val sequenceNumbers = zkClient.getAllLogDirEventNotifications
       try {
-        val brokerIds = zkUtils.getBrokerIdsFromLogDirEvents(sequenceNumbers)
+        val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
         onBrokerLogDirFailure(brokerIds)
       } finally {
         // delete processed children
-        zkUtils.deleteLogDirEventNotifications(sequenceNumbers)
+        zkClient.deleteLogDirEventNotifications(sequenceNumbers)
       }
     }
   }
@@ -1227,7 +1229,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       if (!isActive) return
-      val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(immutable.Set(topic))
+      val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
       val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
         !controllerContext.partitionReplicaAssignment.contains(p._1))
       if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
@@ -1248,12 +1250,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       if (!isActive) return
-      var topicsToBeDeleted = zkUtils.getTopicDeletions.toSet
+      var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
       debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
       val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
       if (nonExistentTopics.nonEmpty) {
         warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
-        zkUtils.deleteTopicDeletions(nonExistentTopics.toSeq)
+        zkClient.deleteTopicDeletions(nonExistentTopics.toSeq)
       }
       topicsToBeDeleted --= nonExistentTopics
       if (config.deleteTopicEnable) {
@@ -1272,7 +1274,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
       } else {
         // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
         info(s"Removing $topicsToBeDeleted since delete topic is disabled")
-        zkUtils.deleteTopicDeletions(topicsToBeDeleted.toSeq)
+        zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq)
       }
     }
   }
@@ -1282,8 +1284,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       if (!isActive) return
-      zkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
-      val partitionReassignment = zkUtils.getPartitionReassignment
+      zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
+      val partitionReassignment = zkClient.getPartitionReassignment
       val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
       partitionsToBeReassigned.foreach { partitionToBeReassigned =>
         if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
@@ -1306,7 +1308,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
       // check if this partition is still being reassigned or not
       controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
         val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
-        zkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+        zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
           case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
             val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
             val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
@@ -1334,16 +1336,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       if (!isActive) return
-      val sequenceNumbers = zkUtils.getAllIsrChangeNotifications
+      val sequenceNumbers = zkClient.getAllIsrChangeNotifications
       try {
-        val partitions = zkUtils.getPartitionsFromIsrChangeNotifications(sequenceNumbers)
+        val partitions = zkClient.getPartitionsFromIsrChangeNotifications(sequenceNumbers)
         if (partitions.nonEmpty) {
           updateLeaderAndIsrCache(partitions)
           processUpdateNotifications(partitions)
         }
       } finally {
         // delete the notifications
-        zkUtils.deleteIsrChangeNotifications(sequenceNumbers)
+        zkClient.deleteIsrChangeNotifications(sequenceNumbers)
       }
     }
 
@@ -1359,8 +1361,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       if (!isActive) return
-      zkUtils.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
-      val partitions = zkUtils.getPreferredReplicaElection
+      zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
+      val partitions = zkClient.getPreferredReplicaElection
       val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
       if (partitionsForTopicsToBeDeleted.nonEmpty) {
         error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
@@ -1375,8 +1377,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       val wasActiveBeforeChange = isActive
-      zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
-      activeControllerId = zkUtils.getControllerId.getOrElse(-1)
+      zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+      activeControllerId = zkClient.getControllerId.getOrElse(-1)
       if (wasActiveBeforeChange && !isActive) {
         onControllerResignation()
       }
@@ -1388,8 +1390,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
 
     override def process(): Unit = {
       val wasActiveBeforeChange = isActive
-      zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
-      activeControllerId = zkUtils.getControllerId.getOrElse(-1)
+      zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+      activeControllerId = zkClient.getControllerId.getOrElse(-1)
       if (wasActiveBeforeChange && !isActive) {
         onControllerResignation()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala
deleted file mode 100644
index bdd8b57..0000000
--- a/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala
+++ /dev/null
@@ -1,716 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.controller
-
-import java.util.Properties
-
-import kafka.api.LeaderAndIsr
-import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
-import kafka.log.LogConfig
-import kafka.server.ConfigType
-import kafka.utils.{Logging, ZkUtils}
-import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.data.Stat
-import org.apache.zookeeper.{CreateMode, KeeperException}
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean) extends Logging {
-  import KafkaControllerZkUtils._
-
-  /**
-   * Gets topic partition states for the given partitions.
-   * @param partitions the partitions for which we want ot get states.
-   * @return sequence of GetDataResponses whose contexts are the partitions they are associated with.
-   */
-  def getTopicPartitionStatesRaw(partitions: Seq[TopicAndPartition]): Seq[GetDataResponse] = {
-    val getDataRequests = partitions.map { partition =>
-      GetDataRequest(TopicPartitionStateZNode.path(partition), ctx = Some(partition))
-    }
-    retryRequestsUntilConnected(getDataRequests)
-  }
-
-  /**
-   * Sets topic partition states for the given partitions.
-   * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
-   * @return sequence of SetDataResponse whose contexts are the partitions they are associated with.
-   */
-  def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
-    val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
-      val path = TopicPartitionStateZNode.path(partition)
-      val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
-      SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition))
-    }
-    retryRequestsUntilConnected(setDataRequests.toSeq)
-  }
-
-  /**
-   * Creates topic partition state znodes for the given partitions.
-   * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
-   * @return sequence of CreateResponse whose contexts are the partitions they are associated with.
-   */
-  def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
-    createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq)
-    createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq)
-    val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
-      val path = TopicPartitionStateZNode.path(partition)
-      val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
-      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition))
-    }
-    retryRequestsUntilConnected(createRequests.toSeq)
-  }
-
-  /**
-   * Sets the controller epoch conditioned on the given epochZkVersion.
-   * @param epoch the epoch to set
-   * @param epochZkVersion the expected version number of the epoch znode.
-   * @return SetDataResponse
-   */
-  def setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse = {
-    val setDataRequest = SetDataRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), epochZkVersion)
-    retryRequestUntilConnected(setDataRequest)
-  }
-
-  /**
-   * Creates the controller epoch znode.
-   * @param epoch the epoch to set
-   * @return CreateResponse
-   */
-  def createControllerEpochRaw(epoch: Int): CreateResponse = {
-    val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch),
-      acls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
-    retryRequestUntilConnected(createRequest)
-  }
-
-  /**
-   * Try to update the partition states of multiple partitions in zookeeper.
-   * @param leaderAndIsrs The partition states to update.
-   * @param controllerEpoch The current controller epoch.
-   * @return UpdateLeaderAndIsrResult instance containing per partition results.
-   */
-  def updateLeaderAndIsr(leaderAndIsrs: Map[TopicAndPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = {
-    val successfulUpdates = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
-    val updatesToRetry = mutable.Buffer.empty[TopicAndPartition]
-    val failed = mutable.Map.empty[TopicAndPartition, Exception]
-    val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) }
-    val setDataResponses = try {
-      setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
-    } catch {
-      case e: Exception =>
-        leaderAndIsrs.keys.foreach(partition => failed.put(partition, e))
-        return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
-    }
-    setDataResponses.foreach { setDataResponse =>
-      val partition = setDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
-      if (setDataResponse.resultCode == Code.OK) {
-        val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
-        successfulUpdates.put(partition, updatedLeaderAndIsr)
-      } else if (setDataResponse.resultCode == Code.BADVERSION) {
-        updatesToRetry += partition
-      } else {
-        failed.put(partition, setDataResponse.resultException.get)
-      }
-    }
-    UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
-  }
-
-  /**
-   * Get log configs that merge local configs with topic-level configs in zookeeper.
-   * @param topics The topics to get log configs for.
-   * @param config The local configs.
-   * @return A tuple of two values:
-   *         1. The successfully gathered log configs
-   *         2. Exceptions corresponding to failed log config lookups.
-   */
-  def getLogConfigs(topics: Seq[String], config: java.util.Map[String, AnyRef]):
-  (Map[String, LogConfig], Map[String, Exception]) = {
-    val logConfigs = mutable.Map.empty[String, LogConfig]
-    val failed = mutable.Map.empty[String, Exception]
-    val configResponses = try {
-      getTopicConfigs(topics)
-    } catch {
-      case e: Exception =>
-        topics.foreach(topic => failed.put(topic, e))
-        return (logConfigs.toMap, failed.toMap)
-    }
-    configResponses.foreach { configResponse =>
-      val topic = configResponse.ctx.get.asInstanceOf[String]
-      if (configResponse.resultCode == Code.OK) {
-        val overrides = ConfigEntityZNode.decode(configResponse.data)
-        val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
-        logConfigs.put(topic, logConfig)
-      } else if (configResponse.resultCode == Code.NONODE) {
-        val logConfig = LogConfig.fromProps(config, new Properties)
-        logConfigs.put(topic, logConfig)
-      } else {
-        failed.put(topic, configResponse.resultException.get)
-      }
-    }
-    (logConfigs.toMap, failed.toMap)
-  }
-
-  /**
-   * Gets all brokers in the cluster.
-   * @return sequence of brokers in the cluster.
-   */
-  def getAllBrokersInCluster: Seq[Broker] = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      val brokerIds = getChildrenResponse.children.map(_.toInt)
-      val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
-      val getDataResponses = retryRequestsUntilConnected(getDataRequests)
-      getDataResponses.flatMap { getDataResponse =>
-        val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
-        if (getDataResponse.resultCode == Code.OK) {
-          Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
-        } else if (getDataResponse.resultCode == Code.NONODE) {
-          None
-        } else {
-          throw getDataResponse.resultException.get
-        }
-      }
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
-    }
-  }
-
-  /**
-   * Gets all topics in the cluster.
-   * @return sequence of topics in the cluster.
-   */
-  def getAllTopicsInCluster: Seq[String] = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
-    }
-  }
-
-  /**
-   * Sets the topic znode with the given assignment.
-   * @param topic the topic whose assignment is being set.
-   * @param assignment the partition to replica mapping to set for the given topic
-   * @return SetDataResponse
-   */
-  def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
-    val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), -1)
-    retryRequestUntilConnected(setDataRequest)
-  }
-
-  /**
-   * Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path.
-   * @return sequence of znode names and not the absolute znode path.
-   */
-  def getAllLogDirEventNotifications: Seq[String] = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
-    }
-  }
-
-  /**
-   * Reads each of the log dir event notifications associated with the given sequence numbers and extracts the broker ids.
-   * @param sequenceNumbers the sequence numbers associated with the log dir event notifications.
-   * @return broker ids associated with the given log dir event notifications.
-   */
-  def getBrokerIdsFromLogDirEvents(sequenceNumbers: Seq[String]): Seq[Int] = {
-    val getDataRequests = sequenceNumbers.map { sequenceNumber =>
-      GetDataRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber))
-    }
-    val getDataResponses = retryRequestsUntilConnected(getDataRequests)
-    getDataResponses.flatMap { getDataResponse =>
-      if (getDataResponse.resultCode == Code.OK) {
-        LogDirEventNotificationSequenceZNode.decode(getDataResponse.data)
-      } else if (getDataResponse.resultCode == Code.NONODE) {
-        None
-      } else {
-        throw getDataResponse.resultException.get
-      }
-    }
-  }
-
-  /**
-   * Deletes all log dir event notifications.
-   */
-  def deleteLogDirEventNotifications(): Unit = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      deleteLogDirEventNotifications(getChildrenResponse.children)
-    } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      throw getChildrenResponse.resultException.get
-    }
-  }
-
-  /**
-   * Deletes the log dir event notifications associated with the given sequence numbers.
-   * @param sequenceNumbers the sequence numbers associated with the log dir event notifications to be deleted.
-   */
-  def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
-    val deleteRequests = sequenceNumbers.map { sequenceNumber =>
-      DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), -1)
-    }
-    retryRequestsUntilConnected(deleteRequests)
-  }
-
-  /**
-   * Gets the assignments for the given topics.
-   * @param topics the topics whose partitions we wish to get the assignments for.
-   * @return the replica assignment for each partition from the given topics.
-   */
-  def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicAndPartition, Seq[Int]] = {
-    val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
-    val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
-    getDataResponses.flatMap { getDataResponse =>
-      val topic = getDataResponse.ctx.get.asInstanceOf[String]
-      if (getDataResponse.resultCode == Code.OK) {
-        TopicZNode.decode(topic, getDataResponse.data)
-      } else if (getDataResponse.resultCode == Code.NONODE) {
-        Map.empty[TopicAndPartition, Seq[Int]]
-      } else {
-        throw getDataResponse.resultException.get
-      }
-    }.toMap
-  }
-
-  /**
-   * Get all topics marked for deletion.
-   * @return sequence of topics marked for deletion.
-   */
-  def getTopicDeletions: Seq[String] = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(DeleteTopicsZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
-    }
-  }
-
-  /**
-   * Remove the given topics from the topics marked for deletion.
-   * @param topics the topics to remove.
-   */
-  def deleteTopicDeletions(topics: Seq[String]): Unit = {
-    val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), -1))
-    retryRequestsUntilConnected(deleteRequests)
-  }
-
-  /**
-   * Returns all reassignments.
-   * @return the reassignments for each partition.
-   */
-  def getPartitionReassignment: Map[TopicAndPartition, Seq[Int]] = {
-    val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
-    val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      ReassignPartitionsZNode.decode(getDataResponse.data)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      Map.empty[TopicAndPartition, Seq[Int]]
-    } else {
-      throw getDataResponse.resultException.get
-    }
-  }
-
-  /**
-   * Sets the partition reassignment znode with the given reassignment.
-   * @param reassignment the reassignment to set on the reassignment znode.
-   * @return SetDataResponse
-   */
-  def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
-    val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), -1)
-    retryRequestUntilConnected(setDataRequest)
-  }
-
-  /**
-   * Creates the partition reassignment znode with the given reassignment.
-   * @param reassignment the reassignment to set on the reassignment znode.
-   * @return CreateResponse
-   */
-  def createPartitionReassignment(reassignment: Map[TopicAndPartition, Seq[Int]]): CreateResponse = {
-    val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
-      acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
-    retryRequestUntilConnected(createRequest)
-  }
-
-  /**
-   * Deletes the partition reassignment znode.
-   */
-  def deletePartitionReassignment(): Unit = {
-    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, -1)
-    retryRequestUntilConnected(deleteRequest)
-  }
-
-  /**
-   * Gets topic partition states for the given partitions.
-   * @param partitions the partitions for which we want ot get states.
-   * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
-   */
-  def getTopicPartitionStates(partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
-    val getDataResponses = getTopicPartitionStatesRaw(partitions)
-    getDataResponses.flatMap { getDataResponse =>
-      val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
-      if (getDataResponse.resultCode == Code.OK) {
-        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _)
-      } else if (getDataResponse.resultCode == Code.NONODE) {
-        None
-      } else {
-        throw getDataResponse.resultException.get
-      }
-    }.toMap
-  }
-
-  /**
-   * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
-   * @return sequence of znode names and not the absolute znode path.
-   */
-  def getAllIsrChangeNotifications: Seq[String] = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber)
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
-    }
-  }
-
-  /**
-   * Reads each of the isr change notifications associated with the given sequence numbers and extracts the partitions.
-   * @param sequenceNumbers the sequence numbers associated with the isr change notifications.
-   * @return partitions associated with the given isr change notifications.
-   */
-  def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicAndPartition] = {
-    val getDataRequests = sequenceNumbers.map { sequenceNumber =>
-      GetDataRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber))
-    }
-    val getDataResponses = retryRequestsUntilConnected(getDataRequests)
-    getDataResponses.flatMap { getDataResponse =>
-      if (getDataResponse.resultCode == Code.OK) {
-        IsrChangeNotificationSequenceZNode.decode(getDataResponse.data)
-      } else if (getDataResponse.resultCode == Code.NONODE) {
-        None
-      } else {
-        throw getDataResponse.resultException.get
-      }
-    }
-  }
-
-  /**
-   * Deletes all isr change notifications.
-   */
-  def deleteIsrChangeNotifications(): Unit = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      deleteIsrChangeNotifications(getChildrenResponse.children)
-    } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      throw getChildrenResponse.resultException.get
-    }
-  }
-
-  /**
-   * Deletes the isr change notifications associated with the given sequence numbers.
-   * @param sequenceNumbers the sequence numbers associated with the isr change notifications to be deleted.
-   */
-  def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
-    val deleteRequests = sequenceNumbers.map { sequenceNumber =>
-      DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), -1)
-    }
-    retryRequestsUntilConnected(deleteRequests)
-  }
-
-  /**
-   * Gets the partitions marked for preferred replica election.
-   * @return sequence of partitions.
-   */
-  def getPreferredReplicaElection: Set[TopicAndPartition] = {
-    val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path)
-    val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      PreferredReplicaElectionZNode.decode(getDataResponse.data)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      Set.empty[TopicAndPartition]
-    } else {
-      throw getDataResponse.resultException.get
-    }
-  }
-
-  /**
-   * Deletes the preferred replica election znode.
-   */
-  def deletePreferredReplicaElection(): Unit = {
-    val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, -1)
-    retryRequestUntilConnected(deleteRequest)
-  }
-
-  /**
-   * Gets the controller id.
-   * @return optional integer that is Some if the controller znode exists and can be parsed and None otherwise.
-   */
-  def getControllerId: Option[Int] = {
-    val getDataRequest = GetDataRequest(ControllerZNode.path)
-    val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      ControllerZNode.decode(getDataResponse.data)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      None
-    } else {
-      throw getDataResponse.resultException.get
-    }
-  }
-
-  /**
-   * Deletes the controller znode.
-   */
-  def deleteController(): Unit = {
-    val deleteRequest = DeleteRequest(ControllerZNode.path, -1)
-    retryRequestUntilConnected(deleteRequest)
-  }
-
-  /**
-   * Gets the controller epoch.
-   * @return optional (Int, Stat) that is Some if the controller epoch path exists and None otherwise.
-   */
-  def getControllerEpoch: Option[(Int, Stat)] = {
-    val getDataRequest = GetDataRequest(ControllerEpochZNode.path)
-    val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      val epoch = ControllerEpochZNode.decode(getDataResponse.data)
-      Option(epoch, getDataResponse.stat)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      None
-    } else {
-      throw getDataResponse.resultException.get
-    }
-  }
-
-  /**
-   * Recursively deletes the topic znode.
-   * @param topic the topic whose topic znode we wish to delete.
-   */
-  def deleteTopicZNode(topic: String): Unit = {
-    deleteRecursive(TopicZNode.path(topic))
-  }
-
-  /**
-   * Deletes the topic configs for the given topics.
-   * @param topics the topics whose configs we wish to delete.
-   */
-  def deleteTopicConfigs(topics: Seq[String]): Unit = {
-    val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), -1))
-    retryRequestsUntilConnected(deleteRequests)
-  }
-
-  /**
-   * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data watcher
-   * registrations on paths which might not even exist.
-   *
-   * @param zNodeChangeHandler
-   */
-  def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
-    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val existsResponse = retryRequestUntilConnected(ExistsRequest(zNodeChangeHandler.path))
-    if (existsResponse.resultCode != Code.OK && existsResponse.resultCode != Code.NONODE) {
-      throw existsResponse.resultException.get
-    }
-  }
-
-  /**
-   * See ZookeeperClient.registerZNodeChangeHandler
-   * @param zNodeChangeHandler
-   */
-  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
-    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-  }
-
-  /**
-   * See ZookeeperClient.unregisterZNodeChangeHandler
-   * @param path
-   */
-  def unregisterZNodeChangeHandler(path: String): Unit = {
-    zookeeperClient.unregisterZNodeChangeHandler(path)
-  }
-
-  /**
-   * See ZookeeperClient.registerZNodeChildChangeHandler
-   * @param zNodeChildChangeHandler
-   */
-  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
-    zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
-  }
-
-  /**
-   * See ZookeeperClient.unregisterZNodeChildChangeHandler
-   * @param path
-   */
-  def unregisterZNodeChildChangeHandler(path: String): Unit = {
-    zookeeperClient.unregisterZNodeChildChangeHandler(path)
-  }
-
-  /**
-   * Close the underlying ZookeeperClient.
-   */
-  def close(): Unit = {
-    zookeeperClient.close()
-  }
-
-  private def deleteRecursive(path: String): Unit = {
-    val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
-      val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, -1))
-      if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
-        throw deleteResponse.resultException.get
-      }
-    } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      throw getChildrenResponse.resultException.get
-    }
-  }
-  private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
-    val createRequests = partitions.map { partition =>
-      val path = TopicPartitionZNode.path(partition)
-      val data = TopicPartitionZNode.encode
-      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition))
-    }
-    retryRequestsUntilConnected(createRequests)
-  }
-
-  private def createTopicPartitions(topics: Seq[String]) = {
-    val createRequests = topics.map { topic =>
-      val path = TopicPartitionsZNode.path(topic)
-      val data = TopicPartitionsZNode.encode
-      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(topic))
-    }
-    retryRequestsUntilConnected(createRequests)
-  }
-
-  private def getTopicConfigs(topics: Seq[String]) = {
-    val getDataRequests = topics.map { topic =>
-      GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic))
-    }
-    retryRequestsUntilConnected(getDataRequests)
-  }
-
-  private def acls(path: String) = {
-    import scala.collection.JavaConverters._
-    ZkUtils.defaultAcls(isSecure, path).asScala
-  }
-
-  private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
-    retryRequestsUntilConnected(Seq(request)).head
-  }
-
-  private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
-    val remainingRequests = ArrayBuffer(requests: _*)
-    val responses = new ArrayBuffer[Req#Response]
-    while (remainingRequests.nonEmpty) {
-      val batchResponses = zookeeperClient.handleRequests(remainingRequests)
-
-      // Only execute slow path if we find a response with CONNECTIONLOSS
-      if (batchResponses.exists(_.resultCode == Code.CONNECTIONLOSS)) {
-        val requestResponsePairs = remainingRequests.zip(batchResponses)
-
-        remainingRequests.clear()
-        requestResponsePairs.foreach { case (request, response) =>
-          if (response.resultCode == Code.CONNECTIONLOSS)
-            remainingRequests += request
-          else
-            responses += response
-        }
-
-        if (remainingRequests.nonEmpty)
-          zookeeperClient.waitUntilConnected()
-      } else {
-        remainingRequests.clear()
-        responses ++= batchResponses
-      }
-    }
-    responses
-  }
-
-  def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
-    val checkedEphemeral = new CheckedEphemeral(path, data)
-    info(s"Creating $path (is it secure? $isSecure)")
-    val code = checkedEphemeral.create()
-    info(s"Result of znode creation at $path is: $code")
-    code match {
-      case Code.OK =>
-      case _ => throw KeeperException.create(code)
-    }
-  }
-
-  private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
-    def create(): Code = {
-      val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
-      val createResponse = retryRequestUntilConnected(createRequest)
-      val code = createResponse.resultCode
-      if (code == Code.OK) {
-        code
-      } else if (code == Code.NODEEXISTS) {
-        get()
-      } else {
-        error(s"Error while creating ephemeral at $path with return code: $code")
-        code
-      }
-    }
-
-    private def get(): Code = {
-      val getDataRequest = GetDataRequest(path)
-      val getDataResponse = retryRequestUntilConnected(getDataRequest)
-      val code = getDataResponse.resultCode
-      if (code == Code.OK) {
-        if (getDataResponse.stat.getEphemeralOwner != zookeeperClient.sessionId) {
-          error(s"Error while creating ephemeral at $path with return code: $code")
-          Code.NODEEXISTS
-        } else {
-          code
-        }
-      } else if (code == Code.NONODE) {
-        info(s"The ephemeral node at $path went away while reading it")
-        create()
-      } else {
-        error(s"Error while creating ephemeral at $path with return code: $code")
-        code
-      }
-    }
-  }
-}
-
-object KafkaControllerZkUtils {
-
-  /**
-   * @param successfulPartitions The successfully updated partition states with adjusted znode versions.
-   * @param partitionsToRetry The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts
-   *                      can occur if the partition leader updated partition state while the controller attempted to
-   *                      update partition state.
-   * @param failedPartitions Exceptions corresponding to failed partition state updates.
-   */
-  case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicAndPartition, LeaderAndIsr],
-                                      partitionsToRetry: Seq[TopicAndPartition],
-                                      failedPartitions: Map[TopicAndPartition, Exception])
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 1c87b5e..1dee71d 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -18,9 +18,10 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.common.{StateChangeFailedException, TopicAndPartition}
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
 
@@ -43,7 +44,7 @@ class PartitionStateMachine(config: KafkaConfig,
                             stateChangeLogger: StateChangeLogger,
                             controllerContext: ControllerContext,
                             topicDeletionManager: TopicDeletionManager,
-                            zkUtils: KafkaControllerZkUtils,
+                            zkClient: KafkaZkClient,
                             partitionState: mutable.Map[TopicAndPartition, PartitionState],
                             controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
   private val controllerId = config.brokerId
@@ -217,7 +218,7 @@ class PartitionStateMachine(config: KafkaConfig,
       partition -> leaderIsrAndControllerEpoch
     }.toMap
     val createResponses = try {
-      zkUtils.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+      zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
     } catch {
       case e: Exception =>
         partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
@@ -278,7 +279,7 @@ class PartitionStateMachine(config: KafkaConfig,
   private def doElectLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
   (Seq[TopicAndPartition], Seq[TopicAndPartition], Map[TopicAndPartition, Exception]) = {
     val getDataResponses = try {
-      zkUtils.getTopicPartitionStatesRaw(partitions)
+      zkClient.getTopicPartitionStatesRaw(partitions)
     } catch {
       case e: Exception =>
         return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap)
@@ -331,7 +332,7 @@ class PartitionStateMachine(config: KafkaConfig,
     }
     val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap
     val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap
-    val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(
+    val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
       adjustedLeaderAndIsrs, controllerContext.epoch)
     successfulUpdates.foreach { case (partition, leaderAndIsr) =>
       val replicas = controllerContext.partitionReplicaAssignment(partition)
@@ -349,7 +350,7 @@ class PartitionStateMachine(config: KafkaConfig,
       val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
       liveInSyncReplicas.isEmpty
     }
-    val (logConfigs, failed) = zkUtils.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals())
+    val (logConfigs, failed) = zkClient.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals())
     val partitionsWithUncleanLeaderElectionState = partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
       if (failed.contains(partition.topic)) {
         logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 4da1c7b..e41007b 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -19,9 +19,10 @@ package kafka.controller
 import kafka.api.LeaderAndIsr
 import kafka.common.{StateChangeFailedException, TopicAndPartition}
 import kafka.controller.Callbacks.CallbackBuilder
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import org.apache.zookeeper.KeeperException.Code
 
 import scala.collection.mutable
@@ -48,7 +49,7 @@ class ReplicaStateMachine(config: KafkaConfig,
                           stateChangeLogger: StateChangeLogger,
                           controllerContext: ControllerContext,
                           topicDeletionManager: TopicDeletionManager,
-                          zkUtils: KafkaControllerZkUtils,
+                          zkClient: KafkaZkClient,
                           replicaState: mutable.Map[PartitionAndReplica, ReplicaState],
                           controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
   private val controllerId = config.brokerId
@@ -292,7 +293,7 @@ class ReplicaStateMachine(config: KafkaConfig,
       val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
       leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
     }
-    val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(
+    val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
       adjustedLeaderAndIsrs, controllerContext.epoch)
     val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
       if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
@@ -325,7 +326,7 @@ class ReplicaStateMachine(config: KafkaConfig,
     val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicAndPartition]
     val failed = mutable.Map.empty[TopicAndPartition, Exception]
     val getDataResponses = try {
-      zkUtils.getTopicPartitionStatesRaw(partitions)
+      zkClient.getTopicPartitionStatesRaw(partitions)
     } catch {
       case e: Exception =>
         partitions.foreach(partition => failed.put(partition, e))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 52302a3..2e93f9d 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -19,6 +19,7 @@ package kafka.controller
 
 import kafka.common.TopicAndPartition
 import kafka.utils.Logging
+import kafka.zk.KafkaZkClient
 
 import scala.collection.{Set, mutable}
 
@@ -57,7 +58,7 @@ import scala.collection.{Set, mutable}
  */
 class TopicDeletionManager(controller: KafkaController,
                            eventManager: ControllerEventManager,
-                           kafkaControllerZkUtils: KafkaControllerZkUtils) extends Logging {
+                           zkClient: KafkaZkClient) extends Logging {
   this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
   val controllerContext = controller.controllerContext
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
@@ -73,7 +74,7 @@ class TopicDeletionManager(controller: KafkaController,
     } else {
       // if delete topic is disabled clean the topic entries under /admin/delete_topics
       info("Removing " + initialTopicsToBeDeleted + " since delete topic is disabled")
-      kafkaControllerZkUtils.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
+      zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
     }
   }
 
@@ -239,9 +240,9 @@ class TopicDeletionManager(controller: KafkaController,
     controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq, NonExistentPartition)
     topicsToBeDeleted -= topic
     partitionsToBeDeleted.retain(_.topic != topic)
-    kafkaControllerZkUtils.deleteTopicZNode(topic)
-    kafkaControllerZkUtils.deleteTopicConfigs(Seq(topic))
-    kafkaControllerZkUtils.deleteTopicDeletions(Seq(topic))
+    zkClient.deleteTopicZNode(topic)
+    zkClient.deleteTopicConfigs(Seq(topic))
+    zkClient.deleteTopicDeletions(Seq(topic))
     controllerContext.removeTopic(topic)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZkData.scala b/core/src/main/scala/kafka/controller/ZkData.scala
deleted file mode 100644
index 2240b6a..0000000
--- a/core/src/main/scala/kafka/controller/ZkData.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.controller
-
-import java.util.Properties
-
-import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
-import kafka.cluster.{Broker, EndPoint}
-import kafka.common.TopicAndPartition
-import kafka.utils.Json
-import org.apache.zookeeper.data.Stat
-
-import scala.collection.Seq
-
-object ControllerZNode {
-  def path = "/controller"
-  def encode(brokerId: Int, timestamp: Long): Array[Byte] =
-    Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)).getBytes("UTF-8")
-  def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
-    js.asJsonObject("brokerid").to[Int]
-  }
-}
-
-object ControllerEpochZNode {
-  def path = "/controller_epoch"
-  def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes("UTF-8")
-  def decode(bytes: Array[Byte]) : Int = new String(bytes, "UTF-8").toInt
-}
-
-object ConfigZNode {
-  def path = "/config"
-  def encode: Array[Byte] = null
-}
-
-object BrokersZNode {
-  def path = "/brokers"
-  def encode: Array[Byte] = null
-}
-
-object BrokerIdsZNode {
-  def path = s"${BrokersZNode.path}/ids"
-  def encode: Array[Byte] = null
-}
-
-object BrokerIdZNode {
-  def path(id: Int) = s"${BrokerIdsZNode.path}/$id"
-  def encode(id: Int,
-             host: String,
-             port: Int,
-             advertisedEndpoints: Seq[EndPoint],
-             jmxPort: Int,
-             rack: Option[String],
-             apiVersion: ApiVersion): Array[Byte] = {
-    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
-    Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes("UTF-8")
-  }
-
-  def decode(id: Int, bytes: Array[Byte]): Broker = {
-    Broker.createBroker(id, new String(bytes, "UTF-8"))
-  }
-}
-
-object TopicsZNode {
-  def path = s"${BrokersZNode.path}/topics"
-  def encode: Array[Byte] = null
-}
-
-object TopicZNode {
-  def path(topic: String) = s"${TopicsZNode.path}/$topic"
-  def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
-    val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas }
-    Json.encode(Map("version" -> 1, "partitions" -> assignmentJson)).getBytes("UTF-8")
-  }
-  def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = {
-    Json.parseFull(new String(bytes, "UTF-8")).flatMap { js =>
-      val assignmentJson = js.asJsonObject
-      val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject)
-      partitionsJsonOpt.map { partitionsJson =>
-        partitionsJson.iterator.map { case (partition, replicas) =>
-          TopicAndPartition(topic, partition.toInt) -> replicas.to[Seq[Int]]
-        }
-      }
-    }.map(_.toMap).getOrElse(Map.empty)
-  }
-}
-
-object TopicPartitionsZNode {
-  def path(topic: String) = s"${TopicZNode.path(topic)}/partitions"
-  def encode: Array[Byte] = null
-}
-
-object TopicPartitionZNode {
-  def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
-  def encode: Array[Byte] = null
-}
-
-object TopicPartitionStateZNode {
-  def path(partition: TopicAndPartition) = s"${TopicPartitionZNode.path(partition)}/state"
-  def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
-    val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-    val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
-    Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
-      "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)).getBytes("UTF-8")
-  }
-  def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
-    Json.parseFull(new String(bytes, "UTF-8")).map { js =>
-      val leaderIsrAndEpochInfo = js.asJsonObject
-      val leader = leaderIsrAndEpochInfo("leader").to[Int]
-      val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
-      val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
-      val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
-      val zkPathVersion = stat.getVersion
-      LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)
-    }
-  }
-}
-
-object ConfigEntityTypeZNode {
-  def path(entityType: String) = s"${ConfigZNode.path}/$entityType"
-  def encode: Array[Byte] = null
-}
-
-object ConfigEntityZNode {
-  def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName"
-  def encode(config: Properties): Array[Byte] = {
-    import scala.collection.JavaConverters._
-    Json.encode(Map("version" -> 1, "config" -> config.asScala)).getBytes("UTF-8")
-  }
-  def decode(bytes: Array[Byte]): Option[Properties] = {
-    Json.parseFull(new String(bytes, "UTF-8")).map { js =>
-      val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
-      val props = new Properties()
-      configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
-      props
-    }
-  }
-}
-
-object IsrChangeNotificationZNode {
-  def path = "/isr_change_notification"
-  def encode: Array[Byte] = null
-}
-
-object IsrChangeNotificationSequenceZNode {
-  val SequenceNumberPrefix = "isr_change_"
-  def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
-  def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
-    val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition))
-    Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson)).getBytes("UTF-8")
-  }
-
-  def decode(bytes: Array[Byte]): Set[TopicAndPartition] = {
-    Json.parseFull(new String(bytes, "UTF-8")).map { js =>
-      val partitionsJson = js.asJsonObject("partitions").asJsonArray
-      partitionsJson.iterator.map { partitionsJson =>
-        val partitionJson = partitionsJson.asJsonObject
-        val topic = partitionJson("topic").to[String]
-        val partition = partitionJson("partition").to[Int]
-        TopicAndPartition(topic, partition)
-      }
-    }
-  }.map(_.toSet).getOrElse(Set.empty)
-  def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
-}
-
-object LogDirEventNotificationZNode {
-  def path = "/log_dir_event_notification"
-  def encode: Array[Byte] = null
-}
-
-object LogDirEventNotificationSequenceZNode {
-  val SequenceNumberPrefix = "log_dir_event_"
-  val LogDirFailureEvent = 1
-  def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
-  def encode(brokerId: Int) =
-    Json.encode(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent)).getBytes("UTF-8")
-  def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
-    js.asJsonObject("broker").to[Int]
-  }
-  def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
-}
-
-object AdminZNode {
-  def path = "/admin"
-  def encode: Array[Byte] = null
-}
-
-object DeleteTopicsZNode {
-  def path = s"${AdminZNode.path}/delete_topics"
-  def encode: Array[Byte] = null
-}
-
-object DeleteTopicsTopicZNode {
-  def path(topic: String) = s"${DeleteTopicsZNode.path}/$topic"
-  def encode: Array[Byte] = null
-}
-
-object ReassignPartitionsZNode {
-  def path = s"${AdminZNode.path}/reassign_partitions"
-  def encode(reassignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
-    val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) =>
-      Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas)
-    }
-    Json.encode(Map("version" -> 1, "partitions" -> reassignmentJson)).getBytes("UTF-8")
-  }
-  def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseFull(new String(bytes, "UTF-8")).flatMap { js =>
-    val reassignmentJson = js.asJsonObject
-    val partitionsJsonOpt = reassignmentJson.get("partitions")
-    partitionsJsonOpt.map { partitionsJson =>
-      partitionsJson.asJsonArray.iterator.map { partitionFieldsJs =>
-        val partitionFields = partitionFieldsJs.asJsonObject
-        val topic = partitionFields("topic").to[String]
-        val partition = partitionFields("partition").to[Int]
-        val replicas = partitionFields("replicas").to[Seq[Int]]
-        TopicAndPartition(topic, partition) -> replicas
-      }
-    }
-  }.map(_.toMap).getOrElse(Map.empty)
-}
-
-object PreferredReplicaElectionZNode {
-  def path = s"${AdminZNode.path}/preferred_replica_election"
-  def encode(partitions: Set[TopicAndPartition]): Array[Byte] =
-    Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))).getBytes("UTF-8")
-  def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
-    val partitionsJson = js.asJsonObject("partitions").asJsonArray
-    partitionsJson.iterator.map { partitionsJson =>
-      val partitionJson = partitionsJson.asJsonObject
-      val topic = partitionJson("topic").to[String]
-      val partition = partitionJson("partition").to[Int]
-      TopicAndPartition(topic, partition)
-    }
-  }.map(_.toSet).getOrElse(Set.empty)
-}
\ No newline at end of file