You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/24 11:25:44 UTC

kafka git commit: KAFKA-6074; Use ZookeeperClient in ReplicaManager and Partition

Repository: kafka
Updated Branches:
  refs/heads/trunk ac17ab4f0 -> 0bc2d0e02


KAFKA-6074; Use ZookeeperClient in ReplicaManager and Partition

Replaced ZkUtils with KafkaZkClient in ReplicaManager and Partition.

Relying on existing tests.

Author: tedyu <yu...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>, Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #4254 from tedyu/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0bc2d0e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0bc2d0e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0bc2d0e0

Branch: refs/heads/trunk
Commit: 0bc2d0e02aaea5b2517c40f7a76654460467177c
Parents: ac17ab4
Author: tedyu <yu...@gmail.com>
Authored: Fri Nov 24 11:08:39 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Nov 24 11:25:31 2017 +0000

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  2 +-
 .../main/scala/kafka/cluster/Partition.scala    | 10 ++--
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  5 +-
 .../scala/kafka/server/ReplicaManager.scala     | 11 ++--
 .../main/scala/kafka/utils/LogDirUtils.scala    | 60 --------------------
 .../scala/kafka/utils/ReplicationUtils.scala    | 17 +++---
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 15 +++++
 core/src/main/scala/kafka/zk/ZkData.scala       |  7 +++
 .../ReplicaFetcherThreadFatalErrorTest.scala    |  2 +-
 .../server/HighwatermarkPersistenceTest.scala   | 17 +++---
 .../kafka/server/ReplicaManagerQuotasTest.scala |  5 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 19 ++++---
 .../unit/kafka/server/SimpleFetchTest.scala     |  7 ++-
 .../unit/kafka/utils/ReplicationUtilsTest.scala |  8 +--
 15 files changed, 79 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 09a65af..6665d25 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -58,7 +58,7 @@ trait AdminUtilities {
     }
   }
 
-  def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties
+  def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties
 }
 
 object AdminUtils extends Logging with AdminUtilities {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e29467d..91b86ee 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -20,7 +20,6 @@ package kafka.cluster
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import com.yammer.metrics.core.Gauge
-import kafka.admin.AdminUtils
 import kafka.api.LeaderAndIsr
 import kafka.api.Request
 import kafka.controller.KafkaController
@@ -29,6 +28,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.server._
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
+import kafka.zk.AdminZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException}
 import org.apache.kafka.common.protocol.Errors
@@ -55,7 +55,7 @@ class Partition(val topic: String,
   // Do not use replicaManager if this partition is ReplicaManager.OfflinePartition
   private val localBrokerId = if (!isOffline) replicaManager.config.brokerId else -1
   private val logManager = if (!isOffline) replicaManager.logManager else null
-  private val zkUtils = if (!isOffline) replicaManager.zkUtils else null
+  private val zkClient = if (!isOffline) replicaManager.zkClient else null
   // allReplicasMap includes both assigned replicas and the future replica if there is ongoing replica movement
   private val allReplicasMap = new Pool[Int, Replica]
   // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
@@ -171,8 +171,10 @@ class Partition(val topic: String,
   def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = {
     allReplicasMap.getAndMaybePut(replicaId, {
       if (isReplicaLocal(replicaId)) {
+        val adminZkClient = new AdminZkClient(zkClient)
+        val prop = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
         val config = LogConfig.fromProps(logManager.defaultConfig.originals,
-                                         AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
+                                         prop)
         val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId)
         val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
         val offsetMap = checkpoint.read()
@@ -661,7 +663,7 @@ class Partition(val topic: String,
 
   private def updateIsr(newIsr: Set[Replica]) {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
-    val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
+    val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId,
       newLeaderAndIsr, controllerEpoch, zkVersion)
 
     if(updateSucceeded) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7f61479..d026018 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -319,7 +319,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   }
 
   protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
-    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
+    new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
       brokerTopicStats, metadataCache, logDirFailureChannel)
 
   private def initZk(): ZkUtils = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 4413165..da94c4a 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -20,12 +20,12 @@ package kafka.server
 import java.util
 
 import AbstractFetcherThread.ResultWithPartitions
-import kafka.admin.AdminUtils
 import kafka.api.{FetchRequest => _, _}
 import kafka.cluster.{BrokerEndPoint, Replica}
 import kafka.log.LogConfig
 import kafka.server.ReplicaFetcherThread._
 import kafka.server.epoch.LeaderEpochCache
+import kafka.zk.AdminZkClient
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
@@ -155,7 +155,8 @@ class ReplicaFetcherThread(name: String,
       // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
       // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
       // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
-      if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
+      val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
+      if (!LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
         ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
         // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
         fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 886e80e..1344660 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -30,6 +30,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
@@ -132,7 +133,7 @@ object ReplicaManager {
 class ReplicaManager(val config: KafkaConfig,
                      metrics: Metrics,
                      time: Time,
-                     val zkUtils: ZkUtils,
+                     val zkClient: KafkaZkClient,
                      scheduler: Scheduler,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean,
@@ -148,7 +149,7 @@ class ReplicaManager(val config: KafkaConfig,
   def this(config: KafkaConfig,
            metrics: Metrics,
            time: Time,
-           zkUtils: ZkUtils,
+           zkClient: KafkaZkClient,
            scheduler: Scheduler,
            logManager: LogManager,
            isShuttingDown: AtomicBoolean,
@@ -157,7 +158,7 @@ class ReplicaManager(val config: KafkaConfig,
            metadataCache: MetadataCache,
            logDirFailureChannel: LogDirFailureChannel,
            threadNamePrefix: Option[String] = None) {
-    this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown,
+    this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown,
       quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
       DelayedOperationPurgatory[DelayedProduce](
         purgatoryName = "Produce", brokerId = config.brokerId,
@@ -265,7 +266,7 @@ class ReplicaManager(val config: KafkaConfig,
       if (isrChangeSet.nonEmpty &&
         (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
           lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
-        ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
+        ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet)
         isrChangeSet.clear()
         lastIsrPropagationMs.set(now)
       }
@@ -1433,7 +1434,7 @@ class ReplicaManager(val config: KafkaConfig,
            s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
     }
     logManager.handleLogDirFailure(dir)
-    LogDirUtils.propagateLogDirEvent(zkUtils, localBrokerId)
+    zkClient.propagateLogDirEvent(localBrokerId)
     info(s"Stopped serving replicas in dir $dir")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/utils/LogDirUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/LogDirUtils.scala b/core/src/main/scala/kafka/utils/LogDirUtils.scala
deleted file mode 100644
index 669edf9..0000000
--- a/core/src/main/scala/kafka/utils/LogDirUtils.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.controller.LogDirEventNotificationHandler
-import scala.collection.Map
-
-object LogDirUtils extends Logging {
-
-  private val LogDirEventNotificationPrefix = "log_dir_event_"
-  val LogDirFailureEvent = 1
-
-  def propagateLogDirEvent(zkUtils: ZkUtils, brokerId: Int) {
-    val logDirEventNotificationPath: String = zkUtils.createSequentialPersistentPath(
-      ZkUtils.LogDirEventNotificationPath + "/" + LogDirEventNotificationPrefix, logDirFailureEventZkData(brokerId))
-    debug(s"Added $logDirEventNotificationPath for broker $brokerId")
-  }
-
-  private def logDirFailureEventZkData(brokerId: Int): String = {
-    Json.encode(Map("version" -> LogDirEventNotificationHandler.Version, "broker" -> brokerId, "event" -> LogDirFailureEvent))
-  }
-
-  def deleteLogDirEvents(zkUtils: ZkUtils) {
-    val sequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).toSet
-    sequenceNumbers.map(x => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x))
-  }
-
-  def getBrokerIdFromLogDirEvent(zkUtils: ZkUtils, child: String): Option[Int] = {
-    val changeZnode = ZkUtils.LogDirEventNotificationPath + "/" + child
-    val (jsonOpt, _) = zkUtils.readDataMaybeNull(changeZnode)
-    jsonOpt.flatMap { json =>
-      val result = Json.parseFull(json).map(_.asJsonObject).map { jsObject =>
-        val brokerId = jsObject("broker").to[Int]
-        val eventType = jsObject("event").to[Int]
-        if (eventType != LogDirFailureEvent)
-          throw new IllegalArgumentException(s"The event type $eventType in znode $changeZnode is not recognized")
-        brokerId
-      }
-      if (result.isEmpty)
-        error(s"Invalid LogDirEvent in ZK node '$changeZnode', JSON: $json")
-      result
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 1a0633c..f5751d2 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -20,6 +20,7 @@ package kafka.utils
 import kafka.api.LeaderAndIsr
 import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
 import kafka.utils.ZkUtils._
+import kafka.zk._
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.data.Stat
 
@@ -29,28 +30,26 @@ object ReplicationUtils extends Logging {
 
   private val IsrChangeNotificationPrefix = "isr_change_"
 
-  def updateLeaderAndIsr(zkUtils: ZkUtils, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
+  def updateLeaderAndIsr(zkClient: KafkaZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
     zkVersion: Int): (Boolean,Int) = {
     debug(s"Updated ISR for $topic-$partitionId to ${newLeaderAndIsr.isr.mkString(",")}")
     val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId)
-    val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
+    val newLeaderData = LeaderAndIsrZNode.encode(newLeaderAndIsr, controllerEpoch)
     // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
-    val updatePersistentPath: (Boolean, Int) = zkUtils.conditionalUpdatePersistentPath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
+    val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
     updatePersistentPath
   }
 
-  def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicPartition]): Unit = {
-    val isrChangeNotificationPath: String = zkUtils.createSequentialPersistentPath(
+  def propagateIsrChanges(zkClient: KafkaZkClient, isrChangeSet: Set[TopicPartition]): Unit = {
+    val isrChangeNotificationPath: String = zkClient.createSequentialPersistentPath(
       ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
       generateIsrChangeJson(isrChangeSet))
     debug(s"Added $isrChangeNotificationPath for $isrChangeSet")
   }
 
-  private def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = {
+  private def checkLeaderAndIsrZkData(zkClient: KafkaZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = {
     try {
-      val writtenLeaderAndIsrInfo = zkUtils.readDataMaybeNull(path)
-      val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
-      val writtenStat = writtenLeaderAndIsrInfo._2
+      val (writtenLeaderOpt, writtenStat) = zkClient.getDataAndStat(path)
       val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat)
       writtenLeaderOpt.foreach { writtenData =>
         val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index b419654..110d676 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -25,6 +25,7 @@ import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContex
 import kafka.log.LogConfig
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
 import kafka.security.auth.{Acl, Resource, ResourceType}
+
 import kafka.server.ConfigType
 import kafka.utils._
 import kafka.zookeeper._
@@ -49,6 +50,13 @@ import scala.collection.{Seq, mutable}
 class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging {
   import KafkaZkClient._
 
+  def createSequentialPersistentPath(path: String, data: String = ""): String = {
+    val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+    val createResponse = retryRequestUntilConnected(createRequest)
+    createResponse.resultException.foreach(e => throw e)
+    createResponse.path
+  }
+
   /**
    * Gets topic partition states for the given partitions.
    * @param partitions the partitions for which we want ot get states.
@@ -933,6 +941,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     createResponse.resultException.foreach(e => throw e)
   }
 
+  def propagateLogDirEvent(brokerId: Int) {
+    val logDirEventNotificationPath: String = createSequentialPersistentPath(
+      LogDirEventNotificationZNode.path + "/" + LogDirEventNotificationSequenceZNode.SequenceNumberPrefix,
+      new String(LogDirEventNotificationSequenceZNode.encode(brokerId), UTF_8))
+    debug(s"Added $logDirEventNotificationPath for broker $brokerId")
+  }
+
   /**
    * Deletes all Acl change notifications.
    * @throws KeeperException if there is an error while deleting Acl change notifications

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index a0085cd..be6efca 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -30,6 +30,13 @@ import org.apache.zookeeper.data.Stat
 
 // This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
 
+object LeaderAndIsrZNode {
+  def encode(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
+    Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
+                "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
+  }
+}
+
 object ControllerZNode {
   def path = "/controller"
   def encode(brokerId: Int, timestamp: Long): Array[Byte] =

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 10c7737..46badf0 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -109,7 +109,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
     val server = new KafkaServer(config, time) {
 
       override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
-        new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
+        new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown,
           quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) {
 
           override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index efae329..c0871a7 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -25,7 +25,8 @@ import org.easymock.EasyMock
 import org.junit._
 import org.junit.Assert._
 import kafka.cluster.Replica
-import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
+import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
+import kafka.zk.KafkaZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.kafka.common.TopicPartition
@@ -34,7 +35,7 @@ class HighwatermarkPersistenceTest {
 
   val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
   val topic = "foo"
-  val zkUtils = EasyMock.createMock(classOf[ZkUtils])
+  val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
   val logManagers = configs map { config =>
     TestUtils.createLogManager(
       logDirs = config.logDirs.map(new File(_)),
@@ -54,7 +55,7 @@ class HighwatermarkPersistenceTest {
   @Test
   def testHighWatermarkPersistenceSinglePartition() {
     // mock zkclient
-    EasyMock.replay(zkUtils)
+    EasyMock.replay(zkClient)
 
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
@@ -62,7 +63,7 @@ class HighwatermarkPersistenceTest {
     val metrics = new Metrics
     val time = new MockTime
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
+    val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler,
       logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
       new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
     replicaManager.startup()
@@ -86,7 +87,7 @@ class HighwatermarkPersistenceTest {
       replicaManager.checkpointHighWatermarks()
       fooPartition0Hw = hwmFor(replicaManager, topic, 0)
       assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
-      EasyMock.verify(zkUtils)
+      EasyMock.verify(zkClient)
     } finally {
       // shutdown the replica manager upon test completion
       replicaManager.shutdown(false)
@@ -100,14 +101,14 @@ class HighwatermarkPersistenceTest {
     val topic1 = "foo1"
     val topic2 = "foo2"
     // mock zkclient
-    EasyMock.replay(zkUtils)
+    EasyMock.replay(zkClient)
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     val metrics = new Metrics
     val time = new MockTime
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
+    val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient,
       scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
       new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
     replicaManager.startup()
@@ -155,7 +156,7 @@ class HighwatermarkPersistenceTest {
       // verify checkpointed hw for topic 1
       topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
       assertEquals(10L, topic1Partition0Hw)
-      EasyMock.verify(zkUtils)
+      EasyMock.verify(zkClient)
     } finally {
       // shutdown the replica manager upon test completion
       replicaManager.shutdown(false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 38085b8..334321a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.cluster.Replica
 import kafka.log.Log
 import kafka.utils._
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
@@ -148,7 +149,7 @@ class ReplicaManagerQuotasTest {
   }
 
   def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
-    val zkUtils = createNiceMock(classOf[ZkUtils])
+    val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
     val scheduler = createNiceMock(classOf[KafkaScheduler])
 
     //Create log which handles both a regular read and a 0 bytes read
@@ -182,7 +183,7 @@ class ReplicaManagerQuotasTest {
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(logManager)
 
-    replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
+    replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler, logManager,
       new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
       new BrokerTopicStats, new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b9d884a..ce88688 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -22,9 +22,10 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.log.LogConfig
-import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
+import kafka.utils.{MockScheduler, MockTime, TestUtils}
 import TestUtils.createBroker
 import kafka.utils.timer.MockTimer
+import kafka.zk.KafkaZkClient
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -48,14 +49,16 @@ class ReplicaManagerTest {
   val time = new MockTime
   val metrics = new Metrics
   var zkClient: ZkClient = _
-  var zkUtils: ZkUtils = _
+  var kafkaZkClient: KafkaZkClient = _
 
   @Before
   def setUp() {
     zkClient = EasyMock.createMock(classOf[ZkClient])
+    kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
+    EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new Properties()).anyTimes()
+    EasyMock.replay(kafkaZkClient)
     EasyMock.expect(zkClient.readData(EasyMock.anyString(), EasyMock.anyObject[Stat])).andReturn(null).anyTimes()
     EasyMock.replay(zkClient)
-    zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
   }
 
   @After
@@ -68,7 +71,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
-    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
@@ -87,7 +90,7 @@ class ReplicaManagerTest {
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
-    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
@@ -105,7 +108,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
-    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName))
     try {
@@ -138,7 +141,7 @@ class ReplicaManagerTest {
     val metadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
     EasyMock.replay(metadataCache)
-    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size))
 
@@ -610,7 +613,7 @@ class ReplicaManagerTest {
     val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
       purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
 
-    new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+    new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
       mockDeleteRecordsPurgatory, Option(this.getClass.getName))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 92b230e..0797b7b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -23,6 +23,7 @@ import kafka.utils._
 import kafka.cluster.Replica
 import kafka.log.Log
 import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.junit.{After, Before, Test}
@@ -69,8 +70,8 @@ class SimpleFetchTest {
   @Before
   def setUp() {
     // create nice mock since we don't particularly care about zkclient calls
-    val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.replay(zkUtils)
+    val kafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    EasyMock.replay(kafkaZkClient)
 
     // create nice mock since we don't particularly care about scheduler calls
     val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
@@ -111,7 +112,7 @@ class SimpleFetchTest {
     EasyMock.replay(logManager)
 
     // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
+    replicaManager = new ReplicaManager(configs.head, metrics, time, kafkaZkClient, scheduler, logManager,
       new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
       new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 02e5bb5..ebab756 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -66,7 +66,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
     EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.logManager).andReturn(logManager)
     EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
-    EasyMock.expect(replicaManager.zkUtils).andReturn(zkUtils)
+    EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
     EasyMock.replay(replicaManager)
 
     zkUtils.makeSurePersistentPathExists(ZkUtils.IsrChangeNotificationPath)
@@ -75,14 +75,14 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
 
     // regular update
     val newLeaderAndIsr1 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, 0)
-    val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
+    val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkClient,
       "my-topic-test", partitionId, newLeaderAndIsr1, controllerEpoch, 0)
     assertTrue(updateSucceeded1)
     assertEquals(newZkVersion1, 1)
 
     // mismatched zkVersion with the same data
     val newLeaderAndIsr2 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, zkVersion + 1)
-    val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
+    val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkClient,
       "my-topic-test", partitionId, newLeaderAndIsr2, controllerEpoch, zkVersion + 1)
     assertTrue(updateSucceeded2)
     // returns true with existing zkVersion
@@ -90,7 +90,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
 
     // mismatched zkVersion and leaderEpoch
     val newLeaderAndIsr3 = new LeaderAndIsr(brokerId, leaderEpoch + 1, replicas, zkVersion + 1)
-    val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
+    val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkClient,
       "my-topic-test", partitionId, newLeaderAndIsr3, controllerEpoch, zkVersion + 1)
     assertFalse(updateSucceeded3)
     assertEquals(newZkVersion3,-1)