You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/26 04:11:21 UTC
[3/3] kafka git commit: MINOR: Rename and change package of async
ZooKeeper classes
MINOR: Rename and change package of async ZooKeeper classes
- kafka.controller.ZookeeperClient -> kafka.zookeeper.ZooKeeperClient
- kafka.controller.ControllerZkUtils -> kafka.zk.KafkaZkClient
- kafka.controller.ZkData -> kafka.zk.ZkData
- Renamed various fields to match new names and for consistency
- A few clean-ups in ZkData
- Document intent
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Onur Karaman <ok...@linkedin.com>, Manikumar Reddy <ma...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #4112 from ijuma/rename-zookeeper-client-and-move-to-zookeper-package
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ab6f848b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ab6f848b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ab6f848b
Branch: refs/heads/trunk
Commit: ab6f848ba6cafaed3d75b54005c954733f0d1735
Parents: f7f8e11
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Oct 25 21:11:16 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 25 21:11:16 2017 -0700
----------------------------------------------------------------------
.../ZkNodeChangeNotificationListener.scala | 4 +-
.../kafka/controller/KafkaController.scala | 144 ++--
.../controller/KafkaControllerZkUtils.scala | 716 ------------------
.../controller/PartitionStateMachine.scala | 13 +-
.../kafka/controller/ReplicaStateMachine.scala | 9 +-
.../kafka/controller/TopicDeletionManager.scala | 11 +-
.../main/scala/kafka/controller/ZkData.scala | 248 -------
.../kafka/controller/ZookeeperClient.scala | 374 ----------
core/src/main/scala/kafka/log/LogManager.scala | 9 +-
.../main/scala/kafka/server/KafkaServer.scala | 18 +-
core/src/main/scala/kafka/utils/Json.scala | 19 +
core/src/main/scala/kafka/utils/ZkUtils.scala | 3 +
.../src/main/scala/kafka/zk/KafkaZkClient.scala | 726 +++++++++++++++++++
core/src/main/scala/kafka/zk/ZkData.scala | 244 +++++++
.../scala/kafka/zookeeper/ZooKeeperClient.scala | 374 ++++++++++
.../api/SaslPlainPlaintextConsumerTest.scala | 2 +-
.../unit/kafka/admin/DeleteTopicTest.scala | 2 +-
.../ZkNodeChangeNotificationListenerTest.scala | 2 +-
.../unit/kafka/consumer/TopicFilterTest.scala | 2 +-
.../controller/PartitionStateMachineTest.scala | 70 +-
.../controller/ReplicaStateMachineTest.scala | 34 +-
.../kafka/controller/ZookeeperClientTest.scala | 339 ---------
.../kafka/integration/AutoOffsetResetTest.scala | 2 +-
.../security/auth/SimpleAclAuthorizerTest.scala | 4 +-
.../kafka/server/ClientQuotaManagerTest.scala | 6 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 10 +-
.../unit/kafka/server/LogRecoveryTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 8 +-
.../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 10 +-
.../test/scala/unit/kafka/zk/ZKPathTest.scala | 8 +-
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 4 +-
.../kafka/zookeeper/ZooKeeperClientTest.scala | 339 +++++++++
.../integration/utils/EmbeddedKafkaCluster.java | 6 +-
34 files changed, 1902 insertions(+), 1865 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index b4ee1fd..0e34c5a 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -153,11 +153,11 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
}
override def handleSessionEstablishmentError(error: Throwable) {
- fatal("Could not establish session with zookeeper", error)
+ fatal("Could not establish session with ZooKeeper", error)
}
override def handleStateChanged(state: KeeperState) {
- debug(s"New zookeeper state: ${state}")
+ debug(s"New ZooKeeper state: ${state}")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 1df40b3..d3e6998 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -22,10 +22,12 @@ import com.yammer.metrics.core.Gauge
import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server._
import kafka.utils._
+import kafka.zk._
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper.{ZNodeChangeHandler, ZNodeChildChangeHandler}
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -41,7 +43,7 @@ object KafkaController extends Logging {
val InitialControllerEpochZkVersion = 1
}
-class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = s"[Controller id=${config.brokerId}] "
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
@@ -55,10 +57,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
_ => updateMetrics())
- val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkUtils)
+ val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
- val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
- val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+ val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+ val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
@@ -155,23 +157,23 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
* This ensures another controller election will be triggered and there will always be an actively serving controller
*/
def onControllerFailover() {
- info("Reading controller epoch from zookeeper")
- readControllerEpochFromZookeeper()
- info("Incrementing controller epoch in zookeeper")
+ info("Reading controller epoch from ZooKeeper")
+ readControllerEpochFromZooKeeper()
+ info("Incrementing controller epoch in ZooKeeper")
incrementControllerEpoch()
info("Registering handlers")
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler)
- childChangeHandlers.foreach(zkUtils.registerZNodeChildChangeHandler)
+ childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
- nodeChangeHandlers.foreach(zkUtils.registerZNodeChangeHandlerAndCheckExistence)
+ nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
info("Deleting log dir event notifications")
- zkUtils.deleteLogDirEventNotifications()
+ zkClient.deleteLogDirEventNotifications()
info("Deleting isr change notifications")
- zkUtils.deleteIsrChangeNotifications()
+ zkClient.deleteIsrChangeNotifications()
info("Initializing controller context")
initializeControllerContext()
info("Fetching topic deletions in progress")
@@ -213,10 +215,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
def onControllerResignation() {
debug("Resigning")
// de-register listeners
- zkUtils.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
- zkUtils.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
- zkUtils.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
- zkUtils.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
+ zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
+ zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
+ zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
+ zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
// reset topic deletion manager
topicDeletionManager.reset()
@@ -232,12 +234,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
unregisterPartitionReassignmentIsrChangeHandlers()
// shutdown partition state machine
partitionStateMachine.shutdown()
- zkUtils.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
+ zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
- zkUtils.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
+ zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
// shutdown replica state machine
replicaStateMachine.shutdown()
- zkUtils.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
+ zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
resetControllerContext()
@@ -465,7 +467,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
val partitionReassignmentIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition)
reassignedPartitionContext.partitionReassignmentIsrChangeHandler = partitionReassignmentIsrChangeHandler
// register listener on the leader and isr path to wait until they catch up with the current leader
- zkUtils.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
+ zkClient.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
}
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@@ -536,7 +538,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
def incrementControllerEpoch(): Unit = {
val newControllerEpoch = controllerContext.epoch + 1
- val setDataResponse = zkUtils.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
+ val setDataResponse = zkClient.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
setDataResponse.resultCode match {
case Code.OK =>
controllerContext.epochZkVersion = setDataResponse.stat.getVersion
@@ -545,7 +547,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
// if path doesn't exist, this is the first controller whose epoch should be 1
// the following call can still fail if another controller gets elected between checking if the path exists and
// trying to create the controller epoch path
- val createResponse = zkUtils.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
+ val createResponse = zkClient.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
createResponse.resultCode match {
case Code.OK =>
controllerContext.epoch = KafkaController.InitialControllerEpoch
@@ -565,10 +567,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
private def initializeControllerContext() {
// update controller cache with delete topic information
- controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster.toSet
- controllerContext.allTopics = zkUtils.getAllTopicsInCluster.toSet
+ controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
+ controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
- controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
+ controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// update the leader and isr cache for all existing partitions from Zookeeper
@@ -582,7 +584,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = {
- val partitionsUndergoingPreferredReplicaElection = zkUtils.getPreferredReplicaElection
+ val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
// check if they are already completed or topic was deleted
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
@@ -618,7 +620,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
private def initializePartitionReassignment() {
// read the partitions being reassigned from zookeeper path /admin/reassign_partitions
- val partitionsBeingReassigned = zkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+ val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
// check if they are already completed or topic was deleted
val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
@@ -637,7 +639,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
- val topicsToBeDeleted = zkUtils.getTopicDeletions.toSet
+ val topicsToBeDeleted = zkClient.getTopicDeletions.toSet
val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) =>
replicas.exists(r => !controllerContext.isReplicaOnline(r, partition))
}.keySet.map(_.topic)
@@ -661,14 +663,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
def updateLeaderAndIsrCache(partitions: Seq[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
- val leaderIsrAndControllerEpochs = zkUtils.getTopicPartitionStates(partitions)
+ val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
}
}
private def areReplicasInIsr(partition: TopicAndPartition, replicas: Seq[Int]): Boolean = {
- zkUtils.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
+ zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
}
}
@@ -720,7 +722,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
replicas: Seq[Int]) {
val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic)
partitionsAndReplicasForThisTopic.put(partition, replicas)
- val setDataResponse = zkUtils.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
+ val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
if (setDataResponse.resultCode == Code.OK) {
info("Updated assigned replicas for partition %s being reassigned to %s ".format(partition, replicas.mkString(",")))
// update the assigned replica list after a successful zookeeper write
@@ -769,13 +771,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
val partitionModificationsHandler = new PartitionModificationsHandler(this, eventManager, topic)
partitionModificationsHandlers.put(topic, partitionModificationsHandler)
}
- partitionModificationsHandlers.values.foreach(zkUtils.registerZNodeChangeHandler)
+ partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
}
def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
topics.foreach { topic =>
partitionModificationsHandlers.remove(topic)
- .foreach(handler => zkUtils.unregisterZNodeChangeHandler(handler.path))
+ .foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
}
}
@@ -784,13 +786,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
case (topicAndPartition, reassignedPartitionsContext) =>
val partitionReassignmentIsrChangeHandler =
reassignedPartitionsContext.partitionReassignmentIsrChangeHandler
- zkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
+ zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
}
}
- private def readControllerEpochFromZookeeper() {
+ private def readControllerEpochFromZooKeeper() {
// initialize the controller epoch and zk version by reading from zookeeper
- val epochAndStatOpt = zkUtils.getControllerEpoch
+ val epochAndStatOpt = zkClient.getControllerEpoch
epochAndStatOpt.foreach { case (epoch, stat) =>
controllerContext.epoch = epoch
controllerContext.epochZkVersion = stat.getVersion
@@ -803,21 +805,21 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
// stop watching the ISR changes for this partition
val partitionReassignmentIsrChangeHandler =
controllerContext.partitionsBeingReassigned(topicAndPartition).partitionReassignmentIsrChangeHandler
- zkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
+ zkClient.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
}
// read the current list of reassigned partitions from zookeeper
- val partitionsBeingReassigned = zkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+ val partitionsBeingReassigned = zkClient.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
// remove this partition from that list
val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
// write the new list to zookeeper
val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
if (reassignment.isEmpty) {
info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path))
- zkUtils.deletePartitionReassignment()
+ zkClient.deletePartitionReassignment()
} else {
- val setDataResponse = zkUtils.setPartitionReassignmentRaw(reassignment)
+ val setDataResponse = zkClient.setPartitionReassignmentRaw(reassignment)
if (setDataResponse.resultCode == Code.NONODE) {
- val createDataResponse = zkUtils.createPartitionReassignment(reassignment)
+ val createDataResponse = zkClient.createPartitionReassignment(reassignment)
createDataResponse.resultException.foreach(e => throw new AdminOperationException(e))
} else {
setDataResponse.resultException.foreach(e => throw new AdminOperationException(e))
@@ -840,7 +842,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
}
if (!isTriggeredByAutoRebalance)
- zkUtils.deletePreferredReplicaElection()
+ zkClient.deletePreferredReplicaElection()
}
/**
@@ -872,7 +874,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
// refresh leader and isr from zookeeper again
- zkWriteCompleteOrUnnecessary = zkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+ zkWriteCompleteOrUnnecessary = zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@@ -885,7 +887,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
// update the new leadership decision in zookeeper or retry
val UpdateLeaderAndIsrResult(successfulUpdates, _, failedUpdates) =
- zkUtils.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
+ zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
if (successfulUpdates.contains(partition)) {
val finalLeaderAndIsr = successfulUpdates(partition)
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
@@ -1065,7 +1067,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
def state = ControllerState.ControllerChange
override def process(): Unit = {
- zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+ zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
}
@@ -1111,7 +1113,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
private def triggerControllerMove(): Unit = {
onControllerResignation()
activeControllerId = -1
- zkUtils.deleteController()
+ zkClient.deleteController()
}
def expire(): Unit = {
@@ -1126,7 +1128,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
def elect(): Unit = {
val timestamp = time.milliseconds
- activeControllerId = zkUtils.getControllerId.getOrElse(-1)
+ activeControllerId = zkClient.getControllerId.getOrElse(-1)
/*
* We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
* it's possible that the controller has already been elected when we get here. This check will prevent the following
@@ -1138,14 +1140,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
}
try {
- zkUtils.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
+ zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
info(config.brokerId + " successfully elected as the controller")
activeControllerId = config.brokerId
onControllerFailover()
} catch {
case _: NodeExistsException =>
// If someone else has written the path, then
- activeControllerId = zkUtils.getControllerId.getOrElse(-1)
+ activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (activeControllerId != -1)
debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
@@ -1163,7 +1165,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
- val curBrokers = zkUtils.getAllBrokersInCluster.toSet
+ val curBrokers = zkClient.getAllBrokersInCluster.toSet
val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
@@ -1189,13 +1191,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
- val topics = zkUtils.getAllTopicsInCluster.toSet
+ val topics = zkClient.getAllTopicsInCluster.toSet
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
registerPartitionModificationsHandlers(newTopics.toSeq)
- val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics)
+ val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
@@ -1211,13 +1213,13 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
- val sequenceNumbers = zkUtils.getAllLogDirEventNotifications
+ val sequenceNumbers = zkClient.getAllLogDirEventNotifications
try {
- val brokerIds = zkUtils.getBrokerIdsFromLogDirEvents(sequenceNumbers)
+ val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
onBrokerLogDirFailure(brokerIds)
} finally {
// delete processed children
- zkUtils.deleteLogDirEventNotifications(sequenceNumbers)
+ zkClient.deleteLogDirEventNotifications(sequenceNumbers)
}
}
}
@@ -1227,7 +1229,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
- val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(immutable.Set(topic))
+ val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
@@ -1248,12 +1250,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
- var topicsToBeDeleted = zkUtils.getTopicDeletions.toSet
+ var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
- zkUtils.deleteTopicDeletions(nonExistentTopics.toSeq)
+ zkClient.deleteTopicDeletions(nonExistentTopics.toSeq)
}
topicsToBeDeleted --= nonExistentTopics
if (config.deleteTopicEnable) {
@@ -1272,7 +1274,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
} else {
// If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
info(s"Removing $topicsToBeDeleted since delete topic is disabled")
- zkUtils.deleteTopicDeletions(topicsToBeDeleted.toSeq)
+ zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq)
}
}
}
@@ -1282,8 +1284,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
- zkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
- val partitionReassignment = zkUtils.getPartitionReassignment
+ zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
+ val partitionReassignment = zkClient.getPartitionReassignment
val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
partitionsToBeReassigned.foreach { partitionToBeReassigned =>
if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
@@ -1306,7 +1308,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
// check if this partition is still being reassigned or not
controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
- zkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+ zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
@@ -1334,16 +1336,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
- val sequenceNumbers = zkUtils.getAllIsrChangeNotifications
+ val sequenceNumbers = zkClient.getAllIsrChangeNotifications
try {
- val partitions = zkUtils.getPartitionsFromIsrChangeNotifications(sequenceNumbers)
+ val partitions = zkClient.getPartitionsFromIsrChangeNotifications(sequenceNumbers)
if (partitions.nonEmpty) {
updateLeaderAndIsrCache(partitions)
processUpdateNotifications(partitions)
}
} finally {
// delete the notifications
- zkUtils.deleteIsrChangeNotifications(sequenceNumbers)
+ zkClient.deleteIsrChangeNotifications(sequenceNumbers)
}
}
@@ -1359,8 +1361,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
if (!isActive) return
- zkUtils.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
- val partitions = zkUtils.getPreferredReplicaElection
+ zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
+ val partitions = zkClient.getPreferredReplicaElection
val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
if (partitionsForTopicsToBeDeleted.nonEmpty) {
error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
@@ -1375,8 +1377,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
val wasActiveBeforeChange = isActive
- zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
- activeControllerId = zkUtils.getControllerId.getOrElse(-1)
+ zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+ activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (wasActiveBeforeChange && !isActive) {
onControllerResignation()
}
@@ -1388,8 +1390,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: KafkaControllerZkUtils,
override def process(): Unit = {
val wasActiveBeforeChange = isActive
- zkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
- activeControllerId = zkUtils.getControllerId.getOrElse(-1)
+ zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+ activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (wasActiveBeforeChange && !isActive) {
onControllerResignation()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala
deleted file mode 100644
index bdd8b57..0000000
--- a/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala
+++ /dev/null
@@ -1,716 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.controller
-
-import java.util.Properties
-
-import kafka.api.LeaderAndIsr
-import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
-import kafka.log.LogConfig
-import kafka.server.ConfigType
-import kafka.utils.{Logging, ZkUtils}
-import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.data.Stat
-import org.apache.zookeeper.{CreateMode, KeeperException}
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean) extends Logging {
- import KafkaControllerZkUtils._
-
- /**
- * Gets topic partition states for the given partitions.
- * @param partitions the partitions for which we want ot get states.
- * @return sequence of GetDataResponses whose contexts are the partitions they are associated with.
- */
- def getTopicPartitionStatesRaw(partitions: Seq[TopicAndPartition]): Seq[GetDataResponse] = {
- val getDataRequests = partitions.map { partition =>
- GetDataRequest(TopicPartitionStateZNode.path(partition), ctx = Some(partition))
- }
- retryRequestsUntilConnected(getDataRequests)
- }
-
- /**
- * Sets topic partition states for the given partitions.
- * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
- * @return sequence of SetDataResponse whose contexts are the partitions they are associated with.
- */
- def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
- val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
- val path = TopicPartitionStateZNode.path(partition)
- val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
- SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition))
- }
- retryRequestsUntilConnected(setDataRequests.toSeq)
- }
-
- /**
- * Creates topic partition state znodes for the given partitions.
- * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
- * @return sequence of CreateResponse whose contexts are the partitions they are associated with.
- */
- def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
- createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq)
- createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq)
- val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
- val path = TopicPartitionStateZNode.path(partition)
- val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
- CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition))
- }
- retryRequestsUntilConnected(createRequests.toSeq)
- }
-
- /**
- * Sets the controller epoch conditioned on the given epochZkVersion.
- * @param epoch the epoch to set
- * @param epochZkVersion the expected version number of the epoch znode.
- * @return SetDataResponse
- */
- def setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse = {
- val setDataRequest = SetDataRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), epochZkVersion)
- retryRequestUntilConnected(setDataRequest)
- }
-
- /**
- * Creates the controller epoch znode.
- * @param epoch the epoch to set
- * @return CreateResponse
- */
- def createControllerEpochRaw(epoch: Int): CreateResponse = {
- val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch),
- acls(ControllerEpochZNode.path), CreateMode.PERSISTENT)
- retryRequestUntilConnected(createRequest)
- }
-
- /**
- * Try to update the partition states of multiple partitions in zookeeper.
- * @param leaderAndIsrs The partition states to update.
- * @param controllerEpoch The current controller epoch.
- * @return UpdateLeaderAndIsrResult instance containing per partition results.
- */
- def updateLeaderAndIsr(leaderAndIsrs: Map[TopicAndPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = {
- val successfulUpdates = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
- val updatesToRetry = mutable.Buffer.empty[TopicAndPartition]
- val failed = mutable.Map.empty[TopicAndPartition, Exception]
- val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) }
- val setDataResponses = try {
- setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
- } catch {
- case e: Exception =>
- leaderAndIsrs.keys.foreach(partition => failed.put(partition, e))
- return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
- }
- setDataResponses.foreach { setDataResponse =>
- val partition = setDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
- if (setDataResponse.resultCode == Code.OK) {
- val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
- successfulUpdates.put(partition, updatedLeaderAndIsr)
- } else if (setDataResponse.resultCode == Code.BADVERSION) {
- updatesToRetry += partition
- } else {
- failed.put(partition, setDataResponse.resultException.get)
- }
- }
- UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
- }
-
- /**
- * Get log configs that merge local configs with topic-level configs in zookeeper.
- * @param topics The topics to get log configs for.
- * @param config The local configs.
- * @return A tuple of two values:
- * 1. The successfully gathered log configs
- * 2. Exceptions corresponding to failed log config lookups.
- */
- def getLogConfigs(topics: Seq[String], config: java.util.Map[String, AnyRef]):
- (Map[String, LogConfig], Map[String, Exception]) = {
- val logConfigs = mutable.Map.empty[String, LogConfig]
- val failed = mutable.Map.empty[String, Exception]
- val configResponses = try {
- getTopicConfigs(topics)
- } catch {
- case e: Exception =>
- topics.foreach(topic => failed.put(topic, e))
- return (logConfigs.toMap, failed.toMap)
- }
- configResponses.foreach { configResponse =>
- val topic = configResponse.ctx.get.asInstanceOf[String]
- if (configResponse.resultCode == Code.OK) {
- val overrides = ConfigEntityZNode.decode(configResponse.data)
- val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
- logConfigs.put(topic, logConfig)
- } else if (configResponse.resultCode == Code.NONODE) {
- val logConfig = LogConfig.fromProps(config, new Properties)
- logConfigs.put(topic, logConfig)
- } else {
- failed.put(topic, configResponse.resultException.get)
- }
- }
- (logConfigs.toMap, failed.toMap)
- }
-
- /**
- * Gets all brokers in the cluster.
- * @return sequence of brokers in the cluster.
- */
- def getAllBrokersInCluster: Seq[Broker] = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path))
- if (getChildrenResponse.resultCode == Code.OK) {
- val brokerIds = getChildrenResponse.children.map(_.toInt)
- val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
- val getDataResponses = retryRequestsUntilConnected(getDataRequests)
- getDataResponses.flatMap { getDataResponse =>
- val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
- if (getDataResponse.resultCode == Code.OK) {
- Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
- } else if (getDataResponse.resultCode == Code.NONODE) {
- None
- } else {
- throw getDataResponse.resultException.get
- }
- }
- } else if (getChildrenResponse.resultCode == Code.NONODE) {
- Seq.empty
- } else {
- throw getChildrenResponse.resultException.get
- }
- }
-
- /**
- * Gets all topics in the cluster.
- * @return sequence of topics in the cluster.
- */
- def getAllTopicsInCluster: Seq[String] = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path))
- if (getChildrenResponse.resultCode == Code.OK) {
- getChildrenResponse.children
- } else if (getChildrenResponse.resultCode == Code.NONODE) {
- Seq.empty
- } else {
- throw getChildrenResponse.resultException.get
- }
- }
-
- /**
- * Sets the topic znode with the given assignment.
- * @param topic the topic whose assignment is being set.
- * @param assignment the partition to replica mapping to set for the given topic
- * @return SetDataResponse
- */
- def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
- val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), -1)
- retryRequestUntilConnected(setDataRequest)
- }
-
- /**
- * Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path.
- * @return sequence of znode names and not the absolute znode path.
- */
- def getAllLogDirEventNotifications: Seq[String] = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
- if (getChildrenResponse.resultCode == Code.OK) {
- getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)
- } else if (getChildrenResponse.resultCode == Code.NONODE) {
- Seq.empty
- } else {
- throw getChildrenResponse.resultException.get
- }
- }
-
- /**
- * Reads each of the log dir event notifications associated with the given sequence numbers and extracts the broker ids.
- * @param sequenceNumbers the sequence numbers associated with the log dir event notifications.
- * @return broker ids associated with the given log dir event notifications.
- */
- def getBrokerIdsFromLogDirEvents(sequenceNumbers: Seq[String]): Seq[Int] = {
- val getDataRequests = sequenceNumbers.map { sequenceNumber =>
- GetDataRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber))
- }
- val getDataResponses = retryRequestsUntilConnected(getDataRequests)
- getDataResponses.flatMap { getDataResponse =>
- if (getDataResponse.resultCode == Code.OK) {
- LogDirEventNotificationSequenceZNode.decode(getDataResponse.data)
- } else if (getDataResponse.resultCode == Code.NONODE) {
- None
- } else {
- throw getDataResponse.resultException.get
- }
- }
- }
-
- /**
- * Deletes all log dir event notifications.
- */
- def deleteLogDirEventNotifications(): Unit = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
- if (getChildrenResponse.resultCode == Code.OK) {
- deleteLogDirEventNotifications(getChildrenResponse.children)
- } else if (getChildrenResponse.resultCode != Code.NONODE) {
- throw getChildrenResponse.resultException.get
- }
- }
-
- /**
- * Deletes the log dir event notifications associated with the given sequence numbers.
- * @param sequenceNumbers the sequence numbers associated with the log dir event notifications to be deleted.
- */
- def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
- val deleteRequests = sequenceNumbers.map { sequenceNumber =>
- DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), -1)
- }
- retryRequestsUntilConnected(deleteRequests)
- }
-
- /**
- * Gets the assignments for the given topics.
- * @param topics the topics whose partitions we wish to get the assignments for.
- * @return the replica assignment for each partition from the given topics.
- */
- def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicAndPartition, Seq[Int]] = {
- val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
- val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
- getDataResponses.flatMap { getDataResponse =>
- val topic = getDataResponse.ctx.get.asInstanceOf[String]
- if (getDataResponse.resultCode == Code.OK) {
- TopicZNode.decode(topic, getDataResponse.data)
- } else if (getDataResponse.resultCode == Code.NONODE) {
- Map.empty[TopicAndPartition, Seq[Int]]
- } else {
- throw getDataResponse.resultException.get
- }
- }.toMap
- }
-
- /**
- * Get all topics marked for deletion.
- * @return sequence of topics marked for deletion.
- */
- def getTopicDeletions: Seq[String] = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(DeleteTopicsZNode.path))
- if (getChildrenResponse.resultCode == Code.OK) {
- getChildrenResponse.children
- } else if (getChildrenResponse.resultCode == Code.NONODE) {
- Seq.empty
- } else {
- throw getChildrenResponse.resultException.get
- }
- }
-
- /**
- * Remove the given topics from the topics marked for deletion.
- * @param topics the topics to remove.
- */
- def deleteTopicDeletions(topics: Seq[String]): Unit = {
- val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), -1))
- retryRequestsUntilConnected(deleteRequests)
- }
-
- /**
- * Returns all reassignments.
- * @return the reassignments for each partition.
- */
- def getPartitionReassignment: Map[TopicAndPartition, Seq[Int]] = {
- val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
- val getDataResponse = retryRequestUntilConnected(getDataRequest)
- if (getDataResponse.resultCode == Code.OK) {
- ReassignPartitionsZNode.decode(getDataResponse.data)
- } else if (getDataResponse.resultCode == Code.NONODE) {
- Map.empty[TopicAndPartition, Seq[Int]]
- } else {
- throw getDataResponse.resultException.get
- }
- }
-
- /**
- * Sets the partition reassignment znode with the given reassignment.
- * @param reassignment the reassignment to set on the reassignment znode.
- * @return SetDataResponse
- */
- def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
- val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), -1)
- retryRequestUntilConnected(setDataRequest)
- }
-
- /**
- * Creates the partition reassignment znode with the given reassignment.
- * @param reassignment the reassignment to set on the reassignment znode.
- * @return CreateResponse
- */
- def createPartitionReassignment(reassignment: Map[TopicAndPartition, Seq[Int]]): CreateResponse = {
- val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
- acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
- retryRequestUntilConnected(createRequest)
- }
-
- /**
- * Deletes the partition reassignment znode.
- */
- def deletePartitionReassignment(): Unit = {
- val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, -1)
- retryRequestUntilConnected(deleteRequest)
- }
-
- /**
- * Gets topic partition states for the given partitions.
- * @param partitions the partitions for which we want ot get states.
- * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
- */
- def getTopicPartitionStates(partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
- val getDataResponses = getTopicPartitionStatesRaw(partitions)
- getDataResponses.flatMap { getDataResponse =>
- val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
- if (getDataResponse.resultCode == Code.OK) {
- TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _)
- } else if (getDataResponse.resultCode == Code.NONODE) {
- None
- } else {
- throw getDataResponse.resultException.get
- }
- }.toMap
- }
-
- /**
- * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
- * @return sequence of znode names and not the absolute znode path.
- */
- def getAllIsrChangeNotifications: Seq[String] = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
- if (getChildrenResponse.resultCode == Code.OK) {
- getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber)
- } else if (getChildrenResponse.resultCode == Code.NONODE) {
- Seq.empty
- } else {
- throw getChildrenResponse.resultException.get
- }
- }
-
- /**
- * Reads each of the isr change notifications associated with the given sequence numbers and extracts the partitions.
- * @param sequenceNumbers the sequence numbers associated with the isr change notifications.
- * @return partitions associated with the given isr change notifications.
- */
- def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicAndPartition] = {
- val getDataRequests = sequenceNumbers.map { sequenceNumber =>
- GetDataRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber))
- }
- val getDataResponses = retryRequestsUntilConnected(getDataRequests)
- getDataResponses.flatMap { getDataResponse =>
- if (getDataResponse.resultCode == Code.OK) {
- IsrChangeNotificationSequenceZNode.decode(getDataResponse.data)
- } else if (getDataResponse.resultCode == Code.NONODE) {
- None
- } else {
- throw getDataResponse.resultException.get
- }
- }
- }
-
- /**
- * Deletes all isr change notifications.
- */
- def deleteIsrChangeNotifications(): Unit = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
- if (getChildrenResponse.resultCode == Code.OK) {
- deleteIsrChangeNotifications(getChildrenResponse.children)
- } else if (getChildrenResponse.resultCode != Code.NONODE) {
- throw getChildrenResponse.resultException.get
- }
- }
-
- /**
- * Deletes the isr change notifications associated with the given sequence numbers.
- * @param sequenceNumbers the sequence numbers associated with the isr change notifications to be deleted.
- */
- def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
- val deleteRequests = sequenceNumbers.map { sequenceNumber =>
- DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), -1)
- }
- retryRequestsUntilConnected(deleteRequests)
- }
-
- /**
- * Gets the partitions marked for preferred replica election.
- * @return sequence of partitions.
- */
- def getPreferredReplicaElection: Set[TopicAndPartition] = {
- val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path)
- val getDataResponse = retryRequestUntilConnected(getDataRequest)
- if (getDataResponse.resultCode == Code.OK) {
- PreferredReplicaElectionZNode.decode(getDataResponse.data)
- } else if (getDataResponse.resultCode == Code.NONODE) {
- Set.empty[TopicAndPartition]
- } else {
- throw getDataResponse.resultException.get
- }
- }
-
- /**
- * Deletes the preferred replica election znode.
- */
- def deletePreferredReplicaElection(): Unit = {
- val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, -1)
- retryRequestUntilConnected(deleteRequest)
- }
-
- /**
- * Gets the controller id.
- * @return optional integer that is Some if the controller znode exists and can be parsed and None otherwise.
- */
- def getControllerId: Option[Int] = {
- val getDataRequest = GetDataRequest(ControllerZNode.path)
- val getDataResponse = retryRequestUntilConnected(getDataRequest)
- if (getDataResponse.resultCode == Code.OK) {
- ControllerZNode.decode(getDataResponse.data)
- } else if (getDataResponse.resultCode == Code.NONODE) {
- None
- } else {
- throw getDataResponse.resultException.get
- }
- }
-
- /**
- * Deletes the controller znode.
- */
- def deleteController(): Unit = {
- val deleteRequest = DeleteRequest(ControllerZNode.path, -1)
- retryRequestUntilConnected(deleteRequest)
- }
-
- /**
- * Gets the controller epoch.
- * @return optional (Int, Stat) that is Some if the controller epoch path exists and None otherwise.
- */
- def getControllerEpoch: Option[(Int, Stat)] = {
- val getDataRequest = GetDataRequest(ControllerEpochZNode.path)
- val getDataResponse = retryRequestUntilConnected(getDataRequest)
- if (getDataResponse.resultCode == Code.OK) {
- val epoch = ControllerEpochZNode.decode(getDataResponse.data)
- Option(epoch, getDataResponse.stat)
- } else if (getDataResponse.resultCode == Code.NONODE) {
- None
- } else {
- throw getDataResponse.resultException.get
- }
- }
-
- /**
- * Recursively deletes the topic znode.
- * @param topic the topic whose topic znode we wish to delete.
- */
- def deleteTopicZNode(topic: String): Unit = {
- deleteRecursive(TopicZNode.path(topic))
- }
-
- /**
- * Deletes the topic configs for the given topics.
- * @param topics the topics whose configs we wish to delete.
- */
- def deleteTopicConfigs(topics: Seq[String]): Unit = {
- val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), -1))
- retryRequestsUntilConnected(deleteRequests)
- }
-
- /**
- * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data watcher
- * registrations on paths which might not even exist.
- *
- * @param zNodeChangeHandler
- */
- def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
- zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
- val existsResponse = retryRequestUntilConnected(ExistsRequest(zNodeChangeHandler.path))
- if (existsResponse.resultCode != Code.OK && existsResponse.resultCode != Code.NONODE) {
- throw existsResponse.resultException.get
- }
- }
-
- /**
- * See ZookeeperClient.registerZNodeChangeHandler
- * @param zNodeChangeHandler
- */
- def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
- zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
- }
-
- /**
- * See ZookeeperClient.unregisterZNodeChangeHandler
- * @param path
- */
- def unregisterZNodeChangeHandler(path: String): Unit = {
- zookeeperClient.unregisterZNodeChangeHandler(path)
- }
-
- /**
- * See ZookeeperClient.registerZNodeChildChangeHandler
- * @param zNodeChildChangeHandler
- */
- def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
- zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
- }
-
- /**
- * See ZookeeperClient.unregisterZNodeChildChangeHandler
- * @param path
- */
- def unregisterZNodeChildChangeHandler(path: String): Unit = {
- zookeeperClient.unregisterZNodeChildChangeHandler(path)
- }
-
- /**
- * Close the underlying ZookeeperClient.
- */
- def close(): Unit = {
- zookeeperClient.close()
- }
-
- private def deleteRecursive(path: String): Unit = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
- if (getChildrenResponse.resultCode == Code.OK) {
- getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
- val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, -1))
- if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
- throw deleteResponse.resultException.get
- }
- } else if (getChildrenResponse.resultCode != Code.NONODE) {
- throw getChildrenResponse.resultException.get
- }
- }
- private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
- val createRequests = partitions.map { partition =>
- val path = TopicPartitionZNode.path(partition)
- val data = TopicPartitionZNode.encode
- CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition))
- }
- retryRequestsUntilConnected(createRequests)
- }
-
- private def createTopicPartitions(topics: Seq[String]) = {
- val createRequests = topics.map { topic =>
- val path = TopicPartitionsZNode.path(topic)
- val data = TopicPartitionsZNode.encode
- CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(topic))
- }
- retryRequestsUntilConnected(createRequests)
- }
-
- private def getTopicConfigs(topics: Seq[String]) = {
- val getDataRequests = topics.map { topic =>
- GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic))
- }
- retryRequestsUntilConnected(getDataRequests)
- }
-
- private def acls(path: String) = {
- import scala.collection.JavaConverters._
- ZkUtils.defaultAcls(isSecure, path).asScala
- }
-
- private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = {
- retryRequestsUntilConnected(Seq(request)).head
- }
-
- private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = {
- val remainingRequests = ArrayBuffer(requests: _*)
- val responses = new ArrayBuffer[Req#Response]
- while (remainingRequests.nonEmpty) {
- val batchResponses = zookeeperClient.handleRequests(remainingRequests)
-
- // Only execute slow path if we find a response with CONNECTIONLOSS
- if (batchResponses.exists(_.resultCode == Code.CONNECTIONLOSS)) {
- val requestResponsePairs = remainingRequests.zip(batchResponses)
-
- remainingRequests.clear()
- requestResponsePairs.foreach { case (request, response) =>
- if (response.resultCode == Code.CONNECTIONLOSS)
- remainingRequests += request
- else
- responses += response
- }
-
- if (remainingRequests.nonEmpty)
- zookeeperClient.waitUntilConnected()
- } else {
- remainingRequests.clear()
- responses ++= batchResponses
- }
- }
- responses
- }
-
- def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
- val checkedEphemeral = new CheckedEphemeral(path, data)
- info(s"Creating $path (is it secure? $isSecure)")
- val code = checkedEphemeral.create()
- info(s"Result of znode creation at $path is: $code")
- code match {
- case Code.OK =>
- case _ => throw KeeperException.create(code)
- }
- }
-
- private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
- def create(): Code = {
- val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
- val createResponse = retryRequestUntilConnected(createRequest)
- val code = createResponse.resultCode
- if (code == Code.OK) {
- code
- } else if (code == Code.NODEEXISTS) {
- get()
- } else {
- error(s"Error while creating ephemeral at $path with return code: $code")
- code
- }
- }
-
- private def get(): Code = {
- val getDataRequest = GetDataRequest(path)
- val getDataResponse = retryRequestUntilConnected(getDataRequest)
- val code = getDataResponse.resultCode
- if (code == Code.OK) {
- if (getDataResponse.stat.getEphemeralOwner != zookeeperClient.sessionId) {
- error(s"Error while creating ephemeral at $path with return code: $code")
- Code.NODEEXISTS
- } else {
- code
- }
- } else if (code == Code.NONODE) {
- info(s"The ephemeral node at $path went away while reading it")
- create()
- } else {
- error(s"Error while creating ephemeral at $path with return code: $code")
- code
- }
- }
- }
-}
-
-object KafkaControllerZkUtils {
-
- /**
- * @param successfulPartitions The successfully updated partition states with adjusted znode versions.
- * @param partitionsToRetry The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts
- * can occur if the partition leader updated partition state while the controller attempted to
- * update partition state.
- * @param failedPartitions Exceptions corresponding to failed partition state updates.
- */
- case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicAndPartition, LeaderAndIsr],
- partitionsToRetry: Seq[TopicAndPartition],
- failedPartitions: Map[TopicAndPartition, Exception])
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 1c87b5e..1dee71d 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -18,9 +18,10 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.common.{StateChangeFailedException, TopicAndPartition}
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
import kafka.server.KafkaConfig
import kafka.utils.Logging
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
@@ -43,7 +44,7 @@ class PartitionStateMachine(config: KafkaConfig,
stateChangeLogger: StateChangeLogger,
controllerContext: ControllerContext,
topicDeletionManager: TopicDeletionManager,
- zkUtils: KafkaControllerZkUtils,
+ zkClient: KafkaZkClient,
partitionState: mutable.Map[TopicAndPartition, PartitionState],
controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
private val controllerId = config.brokerId
@@ -217,7 +218,7 @@ class PartitionStateMachine(config: KafkaConfig,
partition -> leaderIsrAndControllerEpoch
}.toMap
val createResponses = try {
- zkUtils.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+ zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
} catch {
case e: Exception =>
partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
@@ -278,7 +279,7 @@ class PartitionStateMachine(config: KafkaConfig,
private def doElectLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
(Seq[TopicAndPartition], Seq[TopicAndPartition], Map[TopicAndPartition, Exception]) = {
val getDataResponses = try {
- zkUtils.getTopicPartitionStatesRaw(partitions)
+ zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap)
@@ -331,7 +332,7 @@ class PartitionStateMachine(config: KafkaConfig,
}
val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap
val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap
- val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(
+ val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch)
successfulUpdates.foreach { case (partition, leaderAndIsr) =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
@@ -349,7 +350,7 @@ class PartitionStateMachine(config: KafkaConfig,
val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
liveInSyncReplicas.isEmpty
}
- val (logConfigs, failed) = zkUtils.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals())
+ val (logConfigs, failed) = zkClient.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals())
val partitionsWithUncleanLeaderElectionState = partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
if (failed.contains(partition.topic)) {
logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 4da1c7b..e41007b 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -19,9 +19,10 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.common.{StateChangeFailedException, TopicAndPartition}
import kafka.controller.Callbacks.CallbackBuilder
-import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult
import kafka.server.KafkaConfig
import kafka.utils.Logging
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import org.apache.zookeeper.KeeperException.Code
import scala.collection.mutable
@@ -48,7 +49,7 @@ class ReplicaStateMachine(config: KafkaConfig,
stateChangeLogger: StateChangeLogger,
controllerContext: ControllerContext,
topicDeletionManager: TopicDeletionManager,
- zkUtils: KafkaControllerZkUtils,
+ zkClient: KafkaZkClient,
replicaState: mutable.Map[PartitionAndReplica, ReplicaState],
controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
private val controllerId = config.brokerId
@@ -292,7 +293,7 @@ class ReplicaStateMachine(config: KafkaConfig,
val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
}
- val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(
+ val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch)
val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
@@ -325,7 +326,7 @@ class ReplicaStateMachine(config: KafkaConfig,
val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicAndPartition]
val failed = mutable.Map.empty[TopicAndPartition, Exception]
val getDataResponses = try {
- zkUtils.getTopicPartitionStatesRaw(partitions)
+ zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
partitions.foreach(partition => failed.put(partition, e))
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 52302a3..2e93f9d 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -19,6 +19,7 @@ package kafka.controller
import kafka.common.TopicAndPartition
import kafka.utils.Logging
+import kafka.zk.KafkaZkClient
import scala.collection.{Set, mutable}
@@ -57,7 +58,7 @@ import scala.collection.{Set, mutable}
*/
class TopicDeletionManager(controller: KafkaController,
eventManager: ControllerEventManager,
- kafkaControllerZkUtils: KafkaControllerZkUtils) extends Logging {
+ zkClient: KafkaZkClient) extends Logging {
this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
val controllerContext = controller.controllerContext
val isDeleteTopicEnabled = controller.config.deleteTopicEnable
@@ -73,7 +74,7 @@ class TopicDeletionManager(controller: KafkaController,
} else {
// if delete topic is disabled clean the topic entries under /admin/delete_topics
info("Removing " + initialTopicsToBeDeleted + " since delete topic is disabled")
- kafkaControllerZkUtils.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
+ zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
}
}
@@ -239,9 +240,9 @@ class TopicDeletionManager(controller: KafkaController,
controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
- kafkaControllerZkUtils.deleteTopicZNode(topic)
- kafkaControllerZkUtils.deleteTopicConfigs(Seq(topic))
- kafkaControllerZkUtils.deleteTopicDeletions(Seq(topic))
+ zkClient.deleteTopicZNode(topic)
+ zkClient.deleteTopicConfigs(Seq(topic))
+ zkClient.deleteTopicDeletions(Seq(topic))
controllerContext.removeTopic(topic)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/main/scala/kafka/controller/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZkData.scala b/core/src/main/scala/kafka/controller/ZkData.scala
deleted file mode 100644
index 2240b6a..0000000
--- a/core/src/main/scala/kafka/controller/ZkData.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package kafka.controller
-
-import java.util.Properties
-
-import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
-import kafka.cluster.{Broker, EndPoint}
-import kafka.common.TopicAndPartition
-import kafka.utils.Json
-import org.apache.zookeeper.data.Stat
-
-import scala.collection.Seq
-
-object ControllerZNode {
- def path = "/controller"
- def encode(brokerId: Int, timestamp: Long): Array[Byte] =
- Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)).getBytes("UTF-8")
- def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
- js.asJsonObject("brokerid").to[Int]
- }
-}
-
-object ControllerEpochZNode {
- def path = "/controller_epoch"
- def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes("UTF-8")
- def decode(bytes: Array[Byte]) : Int = new String(bytes, "UTF-8").toInt
-}
-
-object ConfigZNode {
- def path = "/config"
- def encode: Array[Byte] = null
-}
-
-object BrokersZNode {
- def path = "/brokers"
- def encode: Array[Byte] = null
-}
-
-object BrokerIdsZNode {
- def path = s"${BrokersZNode.path}/ids"
- def encode: Array[Byte] = null
-}
-
-object BrokerIdZNode {
- def path(id: Int) = s"${BrokerIdsZNode.path}/$id"
- def encode(id: Int,
- host: String,
- port: Int,
- advertisedEndpoints: Seq[EndPoint],
- jmxPort: Int,
- rack: Option[String],
- apiVersion: ApiVersion): Array[Byte] = {
- val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
- Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes("UTF-8")
- }
-
- def decode(id: Int, bytes: Array[Byte]): Broker = {
- Broker.createBroker(id, new String(bytes, "UTF-8"))
- }
-}
-
-object TopicsZNode {
- def path = s"${BrokersZNode.path}/topics"
- def encode: Array[Byte] = null
-}
-
-object TopicZNode {
- def path(topic: String) = s"${TopicsZNode.path}/$topic"
- def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
- val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas }
- Json.encode(Map("version" -> 1, "partitions" -> assignmentJson)).getBytes("UTF-8")
- }
- def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = {
- Json.parseFull(new String(bytes, "UTF-8")).flatMap { js =>
- val assignmentJson = js.asJsonObject
- val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject)
- partitionsJsonOpt.map { partitionsJson =>
- partitionsJson.iterator.map { case (partition, replicas) =>
- TopicAndPartition(topic, partition.toInt) -> replicas.to[Seq[Int]]
- }
- }
- }.map(_.toMap).getOrElse(Map.empty)
- }
-}
-
-object TopicPartitionsZNode {
- def path(topic: String) = s"${TopicZNode.path(topic)}/partitions"
- def encode: Array[Byte] = null
-}
-
-object TopicPartitionZNode {
- def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
- def encode: Array[Byte] = null
-}
-
-object TopicPartitionStateZNode {
- def path(partition: TopicAndPartition) = s"${TopicPartitionZNode.path(partition)}/state"
- def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
- val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
- val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
- Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
- "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)).getBytes("UTF-8")
- }
- def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
- Json.parseFull(new String(bytes, "UTF-8")).map { js =>
- val leaderIsrAndEpochInfo = js.asJsonObject
- val leader = leaderIsrAndEpochInfo("leader").to[Int]
- val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
- val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
- val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
- val zkPathVersion = stat.getVersion
- LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)
- }
- }
-}
-
-object ConfigEntityTypeZNode {
- def path(entityType: String) = s"${ConfigZNode.path}/$entityType"
- def encode: Array[Byte] = null
-}
-
-object ConfigEntityZNode {
- def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName"
- def encode(config: Properties): Array[Byte] = {
- import scala.collection.JavaConverters._
- Json.encode(Map("version" -> 1, "config" -> config.asScala)).getBytes("UTF-8")
- }
- def decode(bytes: Array[Byte]): Option[Properties] = {
- Json.parseFull(new String(bytes, "UTF-8")).map { js =>
- val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
- val props = new Properties()
- configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
- props
- }
- }
-}
-
-object IsrChangeNotificationZNode {
- def path = "/isr_change_notification"
- def encode: Array[Byte] = null
-}
-
-object IsrChangeNotificationSequenceZNode {
- val SequenceNumberPrefix = "isr_change_"
- def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
- def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
- val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition))
- Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson)).getBytes("UTF-8")
- }
-
- def decode(bytes: Array[Byte]): Set[TopicAndPartition] = {
- Json.parseFull(new String(bytes, "UTF-8")).map { js =>
- val partitionsJson = js.asJsonObject("partitions").asJsonArray
- partitionsJson.iterator.map { partitionsJson =>
- val partitionJson = partitionsJson.asJsonObject
- val topic = partitionJson("topic").to[String]
- val partition = partitionJson("partition").to[Int]
- TopicAndPartition(topic, partition)
- }
- }
- }.map(_.toSet).getOrElse(Set.empty)
- def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
-}
-
-object LogDirEventNotificationZNode {
- def path = "/log_dir_event_notification"
- def encode: Array[Byte] = null
-}
-
-object LogDirEventNotificationSequenceZNode {
- val SequenceNumberPrefix = "log_dir_event_"
- val LogDirFailureEvent = 1
- def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
- def encode(brokerId: Int) =
- Json.encode(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent)).getBytes("UTF-8")
- def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
- js.asJsonObject("broker").to[Int]
- }
- def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
-}
-
-object AdminZNode {
- def path = "/admin"
- def encode: Array[Byte] = null
-}
-
-object DeleteTopicsZNode {
- def path = s"${AdminZNode.path}/delete_topics"
- def encode: Array[Byte] = null
-}
-
-object DeleteTopicsTopicZNode {
- def path(topic: String) = s"${DeleteTopicsZNode.path}/$topic"
- def encode: Array[Byte] = null
-}
-
-object ReassignPartitionsZNode {
- def path = s"${AdminZNode.path}/reassign_partitions"
- def encode(reassignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
- val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) =>
- Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas)
- }
- Json.encode(Map("version" -> 1, "partitions" -> reassignmentJson)).getBytes("UTF-8")
- }
- def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseFull(new String(bytes, "UTF-8")).flatMap { js =>
- val reassignmentJson = js.asJsonObject
- val partitionsJsonOpt = reassignmentJson.get("partitions")
- partitionsJsonOpt.map { partitionsJson =>
- partitionsJson.asJsonArray.iterator.map { partitionFieldsJs =>
- val partitionFields = partitionFieldsJs.asJsonObject
- val topic = partitionFields("topic").to[String]
- val partition = partitionFields("partition").to[Int]
- val replicas = partitionFields("replicas").to[Seq[Int]]
- TopicAndPartition(topic, partition) -> replicas
- }
- }
- }.map(_.toMap).getOrElse(Map.empty)
-}
-
-object PreferredReplicaElectionZNode {
- def path = s"${AdminZNode.path}/preferred_replica_election"
- def encode(partitions: Set[TopicAndPartition]): Array[Byte] =
- Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))).getBytes("UTF-8")
- def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
- val partitionsJson = js.asJsonObject("partitions").asJsonArray
- partitionsJson.iterator.map { partitionsJson =>
- val partitionJson = partitionsJson.asJsonObject
- val topic = partitionJson("topic").to[String]
- val partition = partitionJson("partition").to[Int]
- TopicAndPartition(topic, partition)
- }
- }.map(_.toSet).getOrElse(Set.empty)
-}
\ No newline at end of file