You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/24 11:25:44 UTC
kafka git commit: KAFKA-6074;
Use ZookeeperClient in ReplicaManager and Partition
Repository: kafka
Updated Branches:
refs/heads/trunk ac17ab4f0 -> 0bc2d0e02
KAFKA-6074; Use ZookeeperClient in ReplicaManager and Partition
Replaced ZkUtils with KafkaZkClient in ReplicaManager and Partition.
Relying on existing tests.
Author: tedyu <yu...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>, Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #4254 from tedyu/trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0bc2d0e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0bc2d0e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0bc2d0e0
Branch: refs/heads/trunk
Commit: 0bc2d0e02aaea5b2517c40f7a76654460467177c
Parents: ac17ab4
Author: tedyu <yu...@gmail.com>
Authored: Fri Nov 24 11:08:39 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Nov 24 11:25:31 2017 +0000
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 2 +-
.../main/scala/kafka/cluster/Partition.scala | 10 ++--
.../main/scala/kafka/server/KafkaServer.scala | 2 +-
.../kafka/server/ReplicaFetcherThread.scala | 5 +-
.../scala/kafka/server/ReplicaManager.scala | 11 ++--
.../main/scala/kafka/utils/LogDirUtils.scala | 60 --------------------
.../scala/kafka/utils/ReplicationUtils.scala | 17 +++---
.../src/main/scala/kafka/zk/KafkaZkClient.scala | 15 +++++
core/src/main/scala/kafka/zk/ZkData.scala | 7 +++
.../ReplicaFetcherThreadFatalErrorTest.scala | 2 +-
.../server/HighwatermarkPersistenceTest.scala | 17 +++---
.../kafka/server/ReplicaManagerQuotasTest.scala | 5 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 19 ++++---
.../unit/kafka/server/SimpleFetchTest.scala | 7 ++-
.../unit/kafka/utils/ReplicationUtilsTest.scala | 8 +--
15 files changed, 79 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 09a65af..6665d25 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -58,7 +58,7 @@ trait AdminUtilities {
}
}
- def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties
+ def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties
}
object AdminUtils extends Logging with AdminUtilities {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e29467d..91b86ee 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -20,7 +20,6 @@ package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock
import com.yammer.metrics.core.Gauge
-import kafka.admin.AdminUtils
import kafka.api.LeaderAndIsr
import kafka.api.Request
import kafka.controller.KafkaController
@@ -29,6 +28,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server._
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
+import kafka.zk.AdminZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException}
import org.apache.kafka.common.protocol.Errors
@@ -55,7 +55,7 @@ class Partition(val topic: String,
// Do not use replicaManager if this partition is ReplicaManager.OfflinePartition
private val localBrokerId = if (!isOffline) replicaManager.config.brokerId else -1
private val logManager = if (!isOffline) replicaManager.logManager else null
- private val zkUtils = if (!isOffline) replicaManager.zkUtils else null
+ private val zkClient = if (!isOffline) replicaManager.zkClient else null
// allReplicasMap includes both assigned replicas and the future replica if there is ongoing replica movement
private val allReplicasMap = new Pool[Int, Replica]
// The read lock is only required when multiple reads are executed and needs to be in a consistent manner
@@ -171,8 +171,10 @@ class Partition(val topic: String,
def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = {
allReplicasMap.getAndMaybePut(replicaId, {
if (isReplicaLocal(replicaId)) {
+ val adminZkClient = new AdminZkClient(zkClient)
+ val prop = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val config = LogConfig.fromProps(logManager.defaultConfig.originals,
- AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
+ prop)
val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
val offsetMap = checkpoint.read()
@@ -661,7 +663,7 @@ class Partition(val topic: String,
private def updateIsr(newIsr: Set[Replica]) {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)
- val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
+ val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId,
newLeaderAndIsr, controllerEpoch, zkVersion)
if(updateSucceeded) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7f61479..d026018 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -319,7 +319,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
- new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
+ new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel)
private def initZk(): ZkUtils = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 4413165..da94c4a 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -20,12 +20,12 @@ package kafka.server
import java.util
import AbstractFetcherThread.ResultWithPartitions
-import kafka.admin.AdminUtils
import kafka.api.{FetchRequest => _, _}
import kafka.cluster.{BrokerEndPoint, Replica}
import kafka.log.LogConfig
import kafka.server.ReplicaFetcherThread._
import kafka.server.epoch.LeaderEpochCache
+import kafka.zk.AdminZkClient
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
@@ -155,7 +155,8 @@ class ReplicaFetcherThread(name: String,
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
- if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
+ val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
+ if (!LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 886e80e..1344660 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -30,6 +30,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
@@ -132,7 +133,7 @@ object ReplicaManager {
class ReplicaManager(val config: KafkaConfig,
metrics: Metrics,
time: Time,
- val zkUtils: ZkUtils,
+ val zkClient: KafkaZkClient,
scheduler: Scheduler,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean,
@@ -148,7 +149,7 @@ class ReplicaManager(val config: KafkaConfig,
def this(config: KafkaConfig,
metrics: Metrics,
time: Time,
- zkUtils: ZkUtils,
+ zkClient: KafkaZkClient,
scheduler: Scheduler,
logManager: LogManager,
isShuttingDown: AtomicBoolean,
@@ -157,7 +158,7 @@ class ReplicaManager(val config: KafkaConfig,
metadataCache: MetadataCache,
logDirFailureChannel: LogDirFailureChannel,
threadNamePrefix: Option[String] = None) {
- this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown,
+ this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown,
quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", brokerId = config.brokerId,
@@ -265,7 +266,7 @@ class ReplicaManager(val config: KafkaConfig,
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
- ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
+ ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
}
@@ -1433,7 +1434,7 @@ class ReplicaManager(val config: KafkaConfig,
s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
}
logManager.handleLogDirFailure(dir)
- LogDirUtils.propagateLogDirEvent(zkUtils, localBrokerId)
+ zkClient.propagateLogDirEvent(localBrokerId)
info(s"Stopped serving replicas in dir $dir")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/utils/LogDirUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/LogDirUtils.scala b/core/src/main/scala/kafka/utils/LogDirUtils.scala
deleted file mode 100644
index 669edf9..0000000
--- a/core/src/main/scala/kafka/utils/LogDirUtils.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import kafka.controller.LogDirEventNotificationHandler
-import scala.collection.Map
-
-object LogDirUtils extends Logging {
-
- private val LogDirEventNotificationPrefix = "log_dir_event_"
- val LogDirFailureEvent = 1
-
- def propagateLogDirEvent(zkUtils: ZkUtils, brokerId: Int) {
- val logDirEventNotificationPath: String = zkUtils.createSequentialPersistentPath(
- ZkUtils.LogDirEventNotificationPath + "/" + LogDirEventNotificationPrefix, logDirFailureEventZkData(brokerId))
- debug(s"Added $logDirEventNotificationPath for broker $brokerId")
- }
-
- private def logDirFailureEventZkData(brokerId: Int): String = {
- Json.encode(Map("version" -> LogDirEventNotificationHandler.Version, "broker" -> brokerId, "event" -> LogDirFailureEvent))
- }
-
- def deleteLogDirEvents(zkUtils: ZkUtils) {
- val sequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.LogDirEventNotificationPath).toSet
- sequenceNumbers.map(x => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x))
- }
-
- def getBrokerIdFromLogDirEvent(zkUtils: ZkUtils, child: String): Option[Int] = {
- val changeZnode = ZkUtils.LogDirEventNotificationPath + "/" + child
- val (jsonOpt, _) = zkUtils.readDataMaybeNull(changeZnode)
- jsonOpt.flatMap { json =>
- val result = Json.parseFull(json).map(_.asJsonObject).map { jsObject =>
- val brokerId = jsObject("broker").to[Int]
- val eventType = jsObject("event").to[Int]
- if (eventType != LogDirFailureEvent)
- throw new IllegalArgumentException(s"The event type $eventType in znode $changeZnode is not recognized")
- brokerId
- }
- if (result.isEmpty)
- error(s"Invalid LogDirEvent in ZK node '$changeZnode', JSON: $json")
- result
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 1a0633c..f5751d2 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -20,6 +20,7 @@ package kafka.utils
import kafka.api.LeaderAndIsr
import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
import kafka.utils.ZkUtils._
+import kafka.zk._
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
@@ -29,28 +30,26 @@ object ReplicationUtils extends Logging {
private val IsrChangeNotificationPrefix = "isr_change_"
- def updateLeaderAndIsr(zkUtils: ZkUtils, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
+ def updateLeaderAndIsr(zkClient: KafkaZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
zkVersion: Int): (Boolean,Int) = {
debug(s"Updated ISR for $topic-$partitionId to ${newLeaderAndIsr.isr.mkString(",")}")
val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId)
- val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
+ val newLeaderData = LeaderAndIsrZNode.encode(newLeaderAndIsr, controllerEpoch)
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch
- val updatePersistentPath: (Boolean, Int) = zkUtils.conditionalUpdatePersistentPath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
+ val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
updatePersistentPath
}
- def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicPartition]): Unit = {
- val isrChangeNotificationPath: String = zkUtils.createSequentialPersistentPath(
+ def propagateIsrChanges(zkClient: KafkaZkClient, isrChangeSet: Set[TopicPartition]): Unit = {
+ val isrChangeNotificationPath: String = zkClient.createSequentialPersistentPath(
ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
generateIsrChangeJson(isrChangeSet))
debug(s"Added $isrChangeNotificationPath for $isrChangeSet")
}
- private def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = {
+ private def checkLeaderAndIsrZkData(zkClient: KafkaZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = {
try {
- val writtenLeaderAndIsrInfo = zkUtils.readDataMaybeNull(path)
- val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
- val writtenStat = writtenLeaderAndIsrInfo._2
+ val (writtenLeaderOpt, writtenStat) = zkClient.getDataAndStat(path)
val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat)
writtenLeaderOpt.foreach { writtenData =>
val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index b419654..110d676 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -25,6 +25,7 @@ import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContex
import kafka.log.LogConfig
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.security.auth.{Acl, Resource, ResourceType}
+
import kafka.server.ConfigType
import kafka.utils._
import kafka.zookeeper._
@@ -49,6 +50,13 @@ import scala.collection.{Seq, mutable}
class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging {
import KafkaZkClient._
+ def createSequentialPersistentPath(path: String, data: String = ""): String = {
+ val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+ val createResponse = retryRequestUntilConnected(createRequest)
+ createResponse.resultException.foreach(e => throw e)
+ createResponse.path
+ }
+
/**
* Gets topic partition states for the given partitions.
* @param partitions the partitions for which we want ot get states.
@@ -933,6 +941,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
createResponse.resultException.foreach(e => throw e)
}
+ def propagateLogDirEvent(brokerId: Int) {
+ val logDirEventNotificationPath: String = createSequentialPersistentPath(
+ LogDirEventNotificationZNode.path + "/" + LogDirEventNotificationSequenceZNode.SequenceNumberPrefix,
+ new String(LogDirEventNotificationSequenceZNode.encode(brokerId), UTF_8))
+ debug(s"Added $logDirEventNotificationPath for broker $brokerId")
+ }
+
/**
* Deletes all Acl change notifications.
* @throws KeeperException if there is an error while deleting Acl change notifications
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index a0085cd..be6efca 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -30,6 +30,13 @@ import org.apache.zookeeper.data.Stat
// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
+object LeaderAndIsrZNode {
+ def encode(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
+ Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
+ "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
+ }
+}
+
object ControllerZNode {
def path = "/controller"
def encode(brokerId: Int, timestamp: Long): Array[Byte] =
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 10c7737..46badf0 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -109,7 +109,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
val server = new KafkaServer(config, time) {
override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
- new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
+ new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown,
quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) {
override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index efae329..c0871a7 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -25,7 +25,8 @@ import org.easymock.EasyMock
import org.junit._
import org.junit.Assert._
import kafka.cluster.Replica
-import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
+import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
+import kafka.zk.KafkaZkClient
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kafka.common.TopicPartition
@@ -34,7 +35,7 @@ class HighwatermarkPersistenceTest {
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
val topic = "foo"
- val zkUtils = EasyMock.createMock(classOf[ZkUtils])
+ val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
val logManagers = configs map { config =>
TestUtils.createLogManager(
logDirs = config.logDirs.map(new File(_)),
@@ -54,7 +55,7 @@ class HighwatermarkPersistenceTest {
@Test
def testHighWatermarkPersistenceSinglePartition() {
// mock zkclient
- EasyMock.replay(zkUtils)
+ EasyMock.replay(zkClient)
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
@@ -62,7 +63,7 @@ class HighwatermarkPersistenceTest {
val metrics = new Metrics
val time = new MockTime
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
+ val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler,
logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
replicaManager.startup()
@@ -86,7 +87,7 @@ class HighwatermarkPersistenceTest {
replicaManager.checkpointHighWatermarks()
fooPartition0Hw = hwmFor(replicaManager, topic, 0)
assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
- EasyMock.verify(zkUtils)
+ EasyMock.verify(zkClient)
} finally {
// shutdown the replica manager upon test completion
replicaManager.shutdown(false)
@@ -100,14 +101,14 @@ class HighwatermarkPersistenceTest {
val topic1 = "foo1"
val topic2 = "foo2"
// mock zkclient
- EasyMock.replay(zkUtils)
+ EasyMock.replay(zkClient)
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
scheduler.startup
val metrics = new Metrics
val time = new MockTime
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
+ val replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient,
scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), logDirFailureChannels.head)
replicaManager.startup()
@@ -155,7 +156,7 @@ class HighwatermarkPersistenceTest {
// verify checkpointed hw for topic 1
topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
assertEquals(10L, topic1Partition0Hw)
- EasyMock.verify(zkUtils)
+ EasyMock.verify(zkClient)
} finally {
// shutdown the replica manager upon test completion
replicaManager.shutdown(false)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 38085b8..334321a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.Replica
import kafka.log.Log
import kafka.utils._
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
@@ -148,7 +149,7 @@ class ReplicaManagerQuotasTest {
}
def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
- val zkUtils = createNiceMock(classOf[ZkUtils])
+ val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
val scheduler = createNiceMock(classOf[KafkaScheduler])
//Create log which handles both a regular read and a 0 bytes read
@@ -182,7 +183,7 @@ class ReplicaManagerQuotasTest {
expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
replay(logManager)
- replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
+ replicaManager = new ReplicaManager(configs.head, metrics, time, zkClient, scheduler, logManager,
new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""),
new BrokerTopicStats, new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b9d884a..ce88688 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -22,9 +22,10 @@ import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import kafka.log.LogConfig
-import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
+import kafka.utils.{MockScheduler, MockTime, TestUtils}
import TestUtils.createBroker
import kafka.utils.timer.MockTimer
+import kafka.zk.KafkaZkClient
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -48,14 +49,16 @@ class ReplicaManagerTest {
val time = new MockTime
val metrics = new Metrics
var zkClient: ZkClient = _
- var zkUtils: ZkUtils = _
+ var kafkaZkClient: KafkaZkClient = _
@Before
def setUp() {
zkClient = EasyMock.createMock(classOf[ZkClient])
+ kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
+ EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new Properties()).anyTimes()
+ EasyMock.replay(kafkaZkClient)
EasyMock.expect(zkClient.readData(EasyMock.anyString(), EasyMock.anyObject[Stat])).andReturn(null).anyTimes()
EasyMock.replay(zkClient)
- zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
}
@After
@@ -68,7 +71,7 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
- val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+ val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
@@ -87,7 +90,7 @@ class ReplicaManagerTest {
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
- val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+ val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try {
@@ -105,7 +108,7 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
- val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+ val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size), Option(this.getClass.getName))
try {
@@ -138,7 +141,7 @@ class ReplicaManagerTest {
val metadataCache = EasyMock.createMock(classOf[MetadataCache])
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
EasyMock.replay(metadataCache)
- val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+ val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size))
@@ -610,7 +613,7 @@ class ReplicaManagerTest {
val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
- new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
+ new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, Option(this.getClass.getName))
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 92b230e..0797b7b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -23,6 +23,7 @@ import kafka.utils._
import kafka.cluster.Replica
import kafka.log.Log
import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.junit.{After, Before, Test}
@@ -69,8 +70,8 @@ class SimpleFetchTest {
@Before
def setUp() {
// create nice mock since we don't particularly care about zkclient calls
- val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
- EasyMock.replay(zkUtils)
+ val kafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ EasyMock.replay(kafkaZkClient)
// create nice mock since we don't particularly care about scheduler calls
val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
@@ -111,7 +112,7 @@ class SimpleFetchTest {
EasyMock.replay(logManager)
// create the replica manager
- replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
+ replicaManager = new ReplicaManager(configs.head, metrics, time, kafkaZkClient, scheduler, logManager,
new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time, ""), new BrokerTopicStats,
new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size))
http://git-wip-us.apache.org/repos/asf/kafka/blob/0bc2d0e0/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 02e5bb5..ebab756 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -66,7 +66,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
EasyMock.expect(replicaManager.config).andReturn(configs.head)
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
- EasyMock.expect(replicaManager.zkUtils).andReturn(zkUtils)
+ EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
EasyMock.replay(replicaManager)
zkUtils.makeSurePersistentPathExists(ZkUtils.IsrChangeNotificationPath)
@@ -75,14 +75,14 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
// regular update
val newLeaderAndIsr1 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, 0)
- val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
+ val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkClient,
"my-topic-test", partitionId, newLeaderAndIsr1, controllerEpoch, 0)
assertTrue(updateSucceeded1)
assertEquals(newZkVersion1, 1)
// mismatched zkVersion with the same data
val newLeaderAndIsr2 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, zkVersion + 1)
- val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
+ val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkClient,
"my-topic-test", partitionId, newLeaderAndIsr2, controllerEpoch, zkVersion + 1)
assertTrue(updateSucceeded2)
// returns true with existing zkVersion
@@ -90,7 +90,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
// mismatched zkVersion and leaderEpoch
val newLeaderAndIsr3 = new LeaderAndIsr(brokerId, leaderEpoch + 1, replicas, zkVersion + 1)
- val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
+ val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkClient,
"my-topic-test", partitionId, newLeaderAndIsr3, controllerEpoch, zkVersion + 1)
assertFalse(updateSucceeded3)
assertEquals(newZkVersion3,-1)