You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/11/01 01:06:56 UTC

kafka git commit: KAFKA-6072; User ZookeeperClient in GroupCoordinator and TransactionCoordinator

Repository: kafka
Updated Branches:
  refs/heads/trunk 517870271 -> f88fdbd31


KAFKA-6072; User ZookeeperClient in GroupCoordinator and TransactionCoordinator

Author: Manikumar Reddy <ma...@gmail.com>

Reviewers: Ted Yu <yu...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #4126 from omkreddy/KAFKA-6072-ZK-IN-GRoupCoordinator


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f88fdbd3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f88fdbd3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f88fdbd3

Branch: refs/heads/trunk
Commit: f88fdbd3115cdb0f1bd26817513f3d33359512b1
Parents: 5178702
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Tue Oct 31 18:06:51 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Oct 31 18:06:51 2017 -0700

----------------------------------------------------------------------
 .../coordinator/group/GroupCoordinator.scala    | 11 +--
 .../group/GroupMetadataManager.scala            |  5 +-
 .../transaction/ProducerIdManager.scala         | 11 +--
 .../transaction/TransactionCoordinator.scala    |  9 ++-
 .../transaction/TransactionStateManager.scala   |  5 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  4 +-
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 82 +++++++++++++++++++-
 .../group/GroupCoordinatorTest.scala            | 11 +--
 .../group/GroupMetadataManagerTest.scala        | 14 ++--
 .../transaction/ProducerIdManagerTest.scala     | 24 +++---
 .../TransactionStateManagerTest.scala           | 13 ++--
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 74 +++++++++++++++++-
 12 files changed, 210 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index ed13a08..129eae4 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -23,7 +23,8 @@ import kafka.common.OffsetAndMetadata
 import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
 import kafka.server._
-import kafka.utils._
+import kafka.utils.Logging
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
@@ -819,12 +820,12 @@ object GroupCoordinator {
   val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
 
   def apply(config: KafkaConfig,
-            zkUtils: ZkUtils,
+            zkClient: KafkaZkClient,
             replicaManager: ReplicaManager,
             time: Time): GroupCoordinator = {
     val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
     val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
-    apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
+    apply(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, time)
   }
 
   private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
@@ -841,7 +842,7 @@ object GroupCoordinator {
   )
 
   def apply(config: KafkaConfig,
-            zkUtils: ZkUtils,
+            zkClient: KafkaZkClient,
             replicaManager: ReplicaManager,
             heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
             joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
@@ -852,7 +853,7 @@ object GroupCoordinator {
       groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay)
 
     val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
-      offsetConfig, replicaManager, zkUtils, time)
+      offsetConfig, replicaManager, zkClient, time)
     new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 67a048d..7e6a643 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -31,6 +31,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.inLock
 import kafka.utils._
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
@@ -50,7 +51,7 @@ class GroupMetadataManager(brokerId: Int,
                            interBrokerProtocolVersion: ApiVersion,
                            config: OffsetConfig,
                            replicaManager: ReplicaManager,
-                           zkUtils: ZkUtils,
+                           zkClient: KafkaZkClient,
                            time: Time) extends Logging with KafkaMetricsGroup {
 
   private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
@@ -842,7 +843,7 @@ class GroupMetadataManager(brokerId: Int,
    * If the topic does not exist, the configured partition count is returned.
    */
   private def getGroupMetadataTopicPartitionCount: Int = {
-    zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)
+    zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index f7bde96..6be3c6b 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -18,6 +18,7 @@ package kafka.coordinator.transaction
 
 import kafka.common.KafkaException
 import kafka.utils.{Json, Logging, ZkUtils}
+import kafka.zk.KafkaZkClient
 
 /**
  * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way
@@ -65,7 +66,7 @@ case class ProducerIdBlock(brokerId: Int, blockStartId: Long, blockEndId: Long)
   }
 }
 
-class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging {
+class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends Logging {
 
   this.logIdent = "[ProducerId Manager " + brokerId + "]: "
 
@@ -82,7 +83,7 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
-      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
+      val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath)
 
       // generate the new producerId block
       currentProducerIdBlock = dataOpt match {
@@ -105,7 +106,7 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging
       val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.ProducerIdBlockPath,
+      val (succeeded, version) = zkClient.conditionalUpdatePath(ZkUtils.ProducerIdBlockPath,
         newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
       zkWriteComplete = succeeded
 
@@ -114,10 +115,10 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging
     }
   }
 
-  private def checkProducerIdBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
+  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: String): (Boolean, Int) = {
     try {
       val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
-      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath)
+      val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath)
       dataOpt match {
         case Some(data) =>
           val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 6ad1f40..2cc719d 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -20,7 +20,8 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager}
-import kafka.utils.{Logging, Scheduler, ZkUtils}
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
@@ -34,7 +35,7 @@ object TransactionCoordinator {
   def apply(config: KafkaConfig,
             replicaManager: ReplicaManager,
             scheduler: Scheduler,
-            zkUtils: ZkUtils,
+            zkClient: KafkaZkClient,
             metrics: Metrics,
             metadataCache: MetadataCache,
             time: Time): TransactionCoordinator = {
@@ -50,11 +51,11 @@ object TransactionCoordinator {
       config.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
       config.requestTimeoutMs)
 
-    val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils)
+    val producerIdManager = new ProducerIdManager(config.brokerId, zkClient)
     // we do not need to turn on reaper thread since no tasks will be expired and there are no completed tasks to be purged
     val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId,
       reaperEnabled = false, timerEnabled = false)
-    val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
+    val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig, time)
 
     val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
     val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index f2e25c4..b962e82 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -29,6 +29,7 @@ import kafka.server.Defaults
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils.{Logging, Pool, Scheduler, ZkUtils}
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
@@ -68,7 +69,7 @@ object TransactionStateManager {
  * </ul>
  */
 class TransactionStateManager(brokerId: Int,
-                              zkUtils: ZkUtils,
+                              zkClient: KafkaZkClient,
                               scheduler: Scheduler,
                               replicaManager: ReplicaManager,
                               config: TransactionConfig,
@@ -282,7 +283,7 @@ class TransactionStateManager(brokerId: Int,
    * If the topic does not exist, the default partition count is returned.
    */
   private def getTransactionTopicPartitionCount: Int = {
-    zkUtils.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogNumPartitions)
+    zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogNumPartitions)
   }
 
   private def loadTransactionMetadata(topicPartition: TopicPartition, coordinatorEpoch: Int): Pool[String, TransactionMetadata] =  {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e870ce4..f8111ff 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -259,12 +259,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         /* start group coordinator */
         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
-        groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
+        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
         groupCoordinator.startup()
 
         /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
         // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
-        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM)
+        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
         transactionCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 925a6f6..026dc9d 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -16,6 +16,7 @@
 */
 package kafka.zk
 
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Properties
 
 import kafka.api.LeaderAndIsr
@@ -26,13 +27,13 @@ import kafka.log.LogConfig
 import kafka.server.ConfigType
 import kafka.utils._
 import kafka.zookeeper._
+import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
 import org.apache.zookeeper.{CreateMode, KeeperException}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
-import org.apache.kafka.common.TopicPartition
 
 /**
  * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -311,6 +312,85 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
   }
 
   /**
+   * Gets the partition count for a given topic
+   * @param topic The topic to get partition count for.
+   * @return  optional integer that is Some if the topic exists and None otherwise.
+   */
+  def getTopicPartitionCount(topic: String): Option[Int] = {
+    val topicData = getReplicaAssignmentForTopics(Set(topic))
+    if (topicData.nonEmpty)
+      Some(topicData.size)
+    else
+      None
+  }
+
+  /**
+   * Gets the data and version at the given zk path
+   * @param path zk node path
+   * @return A tuple of 2 elements, where first element is zk node data as string
+   *         and second element is zk node version.
+   *         returns (None, -1) if node doesn't exists and throws exception for any error
+   */
+  def getDataAndVersion(path: String): (Option[String], Int) = {
+    val getDataRequest = GetDataRequest(path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+
+    if (getDataResponse.resultCode == Code.OK) {
+      if (getDataResponse.data == null)
+        (None, getDataResponse.stat.getVersion)
+      else {
+        val data = new String(getDataResponse.data, UTF_8)
+        (Some(data), getDataResponse.stat.getVersion)
+      }
+    } else if (getDataResponse.resultCode == Code.NONODE)
+      (None, -1)
+    else
+      throw getDataResponse.resultException.get
+  }
+
+  /**
+   * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
+   * exist, the current version is not the expected version, etc.) return (false, -1)
+   *
+   * When there is a ConnectionLossException during the conditional update, ZookeeperClient will retry the update and may fail
+   * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one).
+   * In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded.
+   */
+  def conditionalUpdatePath(path: String, data: String, expectVersion: Int,
+                            optionalChecker:Option[(KafkaZkClient, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
+
+    val setDataRequest = SetDataRequest(path, data.getBytes(UTF_8), expectVersion)
+    val setDataResponse = retryRequestUntilConnected(setDataRequest)
+
+    setDataResponse.resultCode match {
+      case Code.OK =>
+        debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
+          .format(path, data, expectVersion, setDataResponse.stat.getVersion))
+        (true, setDataResponse.stat.getVersion)
+
+      case Code.BADVERSION =>
+        optionalChecker match {
+          case Some(checker) => checker(this, path, data)
+          case _ =>
+            debug("Checker method is not passed skipping zkData match")
+            debug("Conditional update of path %s with data %s and expected version %d failed due to %s"
+              .format(path, data, expectVersion, setDataResponse.resultException.get.getMessage))
+            (false, -1)
+        }
+
+      case Code.NONODE =>
+        debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
+          expectVersion, setDataResponse.resultException.get.getMessage))
+        (false, -1)
+
+      case _ =>
+        debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
+          expectVersion, setDataResponse.resultException.get.getMessage))
+        throw setDataResponse.resultException.get
+    }
+  }
+
+  /**
    * Get all topics marked for deletion.
    * @return sequence of topics marked for deletion.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 22efb33..1c770a4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -30,6 +30,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.internals.Topic
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
@@ -61,7 +62,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   var groupCoordinator: GroupCoordinator = null
   var replicaManager: ReplicaManager = null
   var scheduler: KafkaScheduler = null
-  var zkUtils: ZkUtils = null
+  var zkClient: KafkaZkClient = null
 
   private val groupId = "groupId"
   private val protocolType = "consumer"
@@ -85,10 +86,10 @@ class GroupCoordinatorTest extends JUnitSuite {
 
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
 
-    zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+    zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
-    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
-    EasyMock.replay(zkUtils)
+    EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
+    EasyMock.replay(zkClient)
 
     timer = new MockTimer
 
@@ -97,7 +98,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false)
     val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false)
 
-    groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
+    groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
     groupCoordinator.startup(false)
 
     // add the partition into the owned partition list

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index b437405..abedcb8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -23,7 +23,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.log.{Log, LogAppendInfo}
 import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.TestUtils.fail
-import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
+import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
@@ -42,13 +42,15 @@ import scala.collection.JavaConverters._
 import scala.collection._
 import java.util.concurrent.locks.ReentrantLock
 
+import kafka.zk.KafkaZkClient
+
 class GroupMetadataManagerTest {
 
   var time: MockTime = null
   var replicaManager: ReplicaManager = null
   var groupMetadataManager: GroupMetadataManager = null
   var scheduler: KafkaScheduler = null
-  var zkUtils: ZkUtils = null
+  var zkClient: KafkaZkClient = null
   var partition: Partition = null
 
   val groupId = "foo"
@@ -74,13 +76,13 @@ class GroupMetadataManagerTest {
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
 
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
-    zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
-    EasyMock.replay(zkUtils)
+    zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
+    EasyMock.replay(zkClient)
 
     time = new MockTime
     replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
-    groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkUtils, time)
+    groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time)
     partition = EasyMock.niceMock(classOf[Partition])
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 39353b8..c5b42d4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -17,34 +17,34 @@
 package kafka.coordinator.transaction
 
 import kafka.common.KafkaException
-import kafka.utils.ZkUtils
+import kafka.zk.KafkaZkClient
 import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.{After, Test}
 import org.junit.Assert._
 
 class ProducerIdManagerTest {
 
-  private val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+  private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
 
   @After
   def tearDown(): Unit = {
-    EasyMock.reset(zkUtils)
+    EasyMock.reset(zkClient)
   }
 
   @Test
   def testGetProducerId() {
     var zkVersion: Option[Int] = None
     var data: String = null
-    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
+    EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
       override def answer(): (Option[String], Int) = zkVersion.map(Some(data) -> _).getOrElse(None, 0)
     }).anyTimes()
 
     val capturedVersion: Capture[Int] = EasyMock.newCapture()
     val capturedData: Capture[String] = EasyMock.newCapture()
-    EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(),
+    EasyMock.expect(zkClient.conditionalUpdatePath(EasyMock.anyString(),
       EasyMock.capture(capturedData),
       EasyMock.capture(capturedVersion),
-      EasyMock.anyObject[Option[(ZkUtils, String, String) => (Boolean, Int)]])).andAnswer(new IAnswer[(Boolean, Int)] {
+      EasyMock.anyObject[Option[(KafkaZkClient, String, String) => (Boolean, Int)]])).andAnswer(new IAnswer[(Boolean, Int)] {
         override def answer(): (Boolean, Int) = {
           val newZkVersion = capturedVersion.getValue + 1
           zkVersion = Some(newZkVersion)
@@ -53,10 +53,10 @@ class ProducerIdManagerTest {
         }
       }).anyTimes()
 
-    EasyMock.replay(zkUtils)
+    EasyMock.replay(zkClient)
 
-    val manager1 = new ProducerIdManager(0, zkUtils)
-    val manager2 = new ProducerIdManager(1, zkUtils)
+    val manager1 = new ProducerIdManager(0, zkClient)
+    val manager2 = new ProducerIdManager(1, zkClient)
 
     val pid1 = manager1.generateProducerId()
     val pid2 = manager2.generateProducerId()
@@ -76,15 +76,15 @@ class ProducerIdManagerTest {
 
   @Test(expected = classOf[KafkaException])
   def testExceedProducerIdLimit() {
-    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
+    EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] {
       override def answer(): (Option[String], Int) = {
         val json = ProducerIdManager.generateProducerIdBlockJson(
           ProducerIdBlock(0, Long.MaxValue - ProducerIdManager.PidBlockSize, Long.MaxValue))
         (Some(json), 0)
       }
     }).anyTimes()
-    EasyMock.replay(zkUtils)
-    new ProducerIdManager(0, zkUtils)
+    EasyMock.replay(zkClient)
+    new ProducerIdManager(0, zkClient)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 7973b9a..20dfaa6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -21,8 +21,9 @@ import java.util.concurrent.locks.ReentrantLock
 
 import kafka.log.Log
 import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager}
-import kafka.utils.{MockScheduler, Pool, ZkUtils}
+import kafka.utils.{MockScheduler, Pool}
 import kafka.utils.TestUtils.fail
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
 import org.apache.kafka.common.protocol.Errors
@@ -51,17 +52,17 @@ class TransactionStateManagerTest {
 
   val time = new MockTime()
   val scheduler = new MockScheduler(time)
-  val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+  val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
   val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
 
-  EasyMock.expect(zkUtils.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME))
+  EasyMock.expect(zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME))
     .andReturn(Some(numPartitions))
     .anyTimes()
 
-  EasyMock.replay(zkUtils)
+  EasyMock.replay(zkClient)
 
   val txnConfig = TransactionConfig()
-  val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkUtils, scheduler, replicaManager, txnConfig, time)
+  val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time)
 
   val transactionalId1: String = "one"
   val transactionalId2: String = "two"
@@ -82,7 +83,7 @@ class TransactionStateManagerTest {
 
   @After
   def tearDown() {
-    EasyMock.reset(zkUtils, replicaManager)
+    EasyMock.reset(zkClient, replicaManager)
     transactionManager.shutdown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 00c0a02..f2d95c2 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,11 +16,14 @@
 */
 package kafka.zk
 
+import kafka.common.TopicAndPartition
+import kafka.utils.ZkUtils
 import kafka.zookeeper.ZooKeeperClient
-
-import org.junit.{After, Before, Test}
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.apache.kafka.common.TopicPartition
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable
 
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
@@ -89,4 +92,69 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     intercept[IllegalArgumentException](zkClient.createRecursive("create-invalid-path"))
   }
 
+  @Test
+  def testGetTopicPartitionCount() {
+    val topic = "mytest"
+
+    // test with non-existing topic
+    assertTrue(zkClient.getTopicPartitionCount(topic).isEmpty)
+
+    // create a topic path
+    zkClient.createRecursive(ZkUtils.getTopicPath(topic))
+
+    val assignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]()
+    assignment.put(new TopicAndPartition(topic, 0), Seq(0,1))
+    assignment.put(new TopicAndPartition(topic, 1), Seq(0,1))
+    zkClient.setTopicAssignmentRaw(topic, assignment.toMap)
+
+    assertEquals(2, zkClient.getTopicPartitionCount(topic).get)
+  }
+
+
+  @Test
+  def testGetDataAndVersion() {
+    val path = "/testpath"
+
+    // test with non-existing path
+    var dataAndVersion = zkClient.getDataAndVersion(path)
+    assertTrue(dataAndVersion._1.isEmpty)
+    assertEquals(-1, dataAndVersion._2)
+
+    // create a test path
+    zkClient.createRecursive(path)
+    zkClient.conditionalUpdatePath(path, "version1", 0)
+
+    // test with existing path
+    dataAndVersion = zkClient.getDataAndVersion(path)
+    assertEquals("version1", dataAndVersion._1.get)
+    assertEquals(1, dataAndVersion._2)
+
+    zkClient.conditionalUpdatePath(path, "version2", 1)
+    dataAndVersion = zkClient.getDataAndVersion(path)
+    assertEquals("version2", dataAndVersion._1.get)
+    assertEquals(2, dataAndVersion._2)
+  }
+
+  @Test
+  def testConditionalUpdatePath() {
+    val path = "/testconditionalpath"
+
+    // test with non-existing path
+    var statusAndVersion = zkClient.conditionalUpdatePath(path, "version0", 0)
+    assertFalse(statusAndVersion._1)
+    assertEquals(-1, statusAndVersion._2)
+
+    // create path
+    zkClient.createRecursive(path)
+
+    // test with valid expected version
+    statusAndVersion = zkClient.conditionalUpdatePath(path, "version1", 0)
+    assertTrue(statusAndVersion._1)
+    assertEquals(1, statusAndVersion._2)
+
+    // test with invalid expected version
+    statusAndVersion = zkClient.conditionalUpdatePath(path, "version2", 2)
+    assertFalse(statusAndVersion._1)
+    assertEquals(-1, statusAndVersion._2)
+  }
 }
\ No newline at end of file