You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/19 00:24:09 UTC
[4/5] kafka git commit: KAFKA-2639: Refactoring of ZkUtils
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 2027ec8..f39b9a1 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -37,6 +37,7 @@ import kafka.utils.ZkUtils._
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
+import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.Watcher.Event.KeeperState
import scala.collection._
@@ -88,8 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object
private var fetcher: Option[ConsumerFetcherManager] = None
- private var zkClient: ZkClient = null
- private var zkConnection : ZkConnection = null
+ private var zkUtils: ZkUtils = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
@@ -173,21 +173,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def createFetcher() {
if (enableFetcher)
- fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient))
+ fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkUtils))
}
private def connectZk() {
info("Connecting to zookeeper instance at " + config.zkConnect)
- val (client, connection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
- zkClient = client
- zkConnection = connection
+ zkUtils = ZkUtils(config.zkConnect,
+ config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs,
+ JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
}
// Blocks until the offset manager is located and a channel is established to it.
private def ensureOffsetManagerConnected() {
if (config.offsetsStorage == "kafka") {
if (offsetsChannel == null || !offsetsChannel.isConnected)
- offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient,
+ offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkUtils,
config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs)
debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port))
@@ -213,9 +214,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
sendShutdownToAllQueues()
if (config.autoCommitEnable)
commitOffsets(true)
- if (zkClient != null) {
- zkClient.close()
- zkClient = null
+ if (zkUtils != null) {
+ zkUtils.close()
+ zkUtils = null
}
if (offsetsChannel != null) offsetsChannel.disconnect()
@@ -266,7 +267,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
consumerRegistryDir + "/" + consumerIdString,
consumerRegistrationInfo,
- zkConnection.getZookeeper)
+ zkUtils.zkConnection.getZookeeper,
+ false)
zkWatchedEphemeral.create()
info("end registering consumer " + consumerIdString + " in ZK")
@@ -296,7 +298,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) {
if (checkpointedZkOffsets.get(topicPartition) != offset) {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
- updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
+ zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
checkpointedZkOffsets.put(topicPartition, offset)
zkCommitMeter.mark()
}
@@ -404,7 +406,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = {
val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
- val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
+ val offsetString = zkUtils.readDataMaybeNull(dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
offsetString match {
case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong))
case None => (topicPartition, OffsetMetadataAndError.NoOffset)
@@ -599,7 +601,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def deletePartitionOwnershipFromZK(topic: String, partition: Int) {
val topicDirs = new ZKGroupTopicDirs(group, topic)
val znode = topicDirs.consumerOwnerDir + "/" + partition
- deletePath(zkClient, znode)
+ zkUtils.deletePath(znode)
debug("Consumer " + consumerIdString + " releasing " + znode)
}
@@ -630,7 +632,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
var done = false
var cluster: Cluster = null
try {
- cluster = getCluster(zkClient)
+ cluster = zkUtils.getCluster()
done = rebalance(cluster)
} catch {
case e: Throwable =>
@@ -660,14 +662,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def rebalance(cluster: Cluster): Boolean = {
val myTopicThreadIdsMap = TopicCount.constructTopicCount(
- group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
- val brokers = getAllBrokersInCluster(zkClient)
+ group, consumerIdString, zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
+ val brokers = zkUtils.getAllBrokersInCluster()
if (brokers.size == 0) {
// This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
// We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
// are up.
warn("no brokers found when trying to rebalance.")
- zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
+ zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
true
}
else {
@@ -690,7 +692,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
)
}
releasePartitionOwnership(topicRegistry)
- val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
+ val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkUtils)
val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId)
val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
@@ -713,7 +715,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
})
/**
- * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
+ * move the partition ownership here, since that can be used to indicate a truly successful re-balancing attempt
* A rebalancing attempt is completed successfully only after the fetchers have been started correctly
*/
if(reflectPartitionOwnershipDecision(partitionAssignment)) {
@@ -832,9 +834,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val topic = partitionOwner._1.topic
val partition = partitionOwner._1.partition
val consumerThreadId = partitionOwner._2
- val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition)
+ val partitionOwnerPath = zkUtils.getConsumerPartitionOwnerPath(group, topic, partition)
try {
- createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId.toString)
+ zkUtils.createEphemeralPathExpectConflict(partitionOwnerPath, consumerThreadId.toString)
info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
successfullyOwnedPartitions ::= (topic, partition)
true
@@ -951,14 +953,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
})
// listener to consumer and partition changes
- zkClient.subscribeStateChanges(sessionExpirationListener)
+ zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
- zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+ zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
topicStreamsMap.foreach { topicAndStreams =>
// register on broker partition path changes
val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
- zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
+ zkUtils.zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
}
// explicitly trigger load balancing for this consumer
@@ -988,11 +990,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// bootstrap with existing topics
private var wildcardTopics =
- getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+ zkUtils.getChildrenParentMayNotExist(BrokerTopicsPath)
.filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics))
private val wildcardTopicCount = TopicCount.constructTopicCount(
- consumerIdString, topicFilter, numStreams, zkClient, config.excludeInternalTopics)
+ consumerIdString, topicFilter, numStreams, zkUtils, config.excludeInternalTopics)
val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
@@ -1002,7 +1004,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
* Topic events will trigger subsequent synced rebalances.
*/
info("Creating topic event watcher for topics " + topicFilter)
- wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkClient, this)
+ wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkUtils, this)
def handleTopicEvent(allTopics: Seq[String]) {
debug("Handling topic event")
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index f74823b..0cd22f0 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -22,7 +22,7 @@ import kafka.utils.{ZkUtils, Logging}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
-class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
+class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
val eventHandler: TopicEventHandler[String]) extends Logging {
val lock = new Object()
@@ -31,24 +31,24 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
private def startWatchingTopicEvents() {
val topicEventListener = new ZkTopicEventListener()
- ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
+ zkUtils.makeSurePersistentPathExists(ZkUtils.BrokerTopicsPath)
- zkClient.subscribeStateChanges(
+ zkUtils.zkClient.subscribeStateChanges(
new ZkSessionExpireListener(topicEventListener))
- val topics = zkClient.subscribeChildChanges(
+ val topics = zkUtils.zkClient.subscribeChildChanges(
ZkUtils.BrokerTopicsPath, topicEventListener).toList
// call to bootstrap topic list
topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics)
}
- private def stopWatchingTopicEvents() { zkClient.unsubscribeAll() }
+ private def stopWatchingTopicEvents() { zkUtils.zkClient.unsubscribeAll() }
def shutdown() {
lock.synchronized {
info("Shutting down topic event watcher.")
- if (zkClient != null) {
+ if (zkUtils != null) {
stopWatchingTopicEvents()
}
else {
@@ -63,8 +63,8 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
def handleChildChange(parent: String, children: java.util.List[String]) {
lock.synchronized {
try {
- if (zkClient != null) {
- val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
+ if (zkUtils != null) {
+ val latestTopics = zkUtils.zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
debug("all topics: %s".format(latestTopics))
eventHandler.handleTopicEvent(latestTopics)
}
@@ -87,9 +87,9 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
@throws(classOf[Exception])
def handleNewSession() {
lock.synchronized {
- if (zkClient != null) {
+ if (zkUtils != null) {
info("ZK expired: resubscribing topic event listener to topic registry")
- zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener)
+ zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a7b44ca..0a1a684 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -43,8 +43,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.server._
import kafka.common.TopicAndPartition
-class ControllerContext(val zkClient: ZkClient,
- val zkConnection: ZkConnection,
+class ControllerContext(val zkUtils: ZkUtils,
val zkSessionTimeout: Int) {
var controllerChannelManager: ControllerChannelManager = null
val controllerLock: ReentrantLock = new ReentrantLock()
@@ -154,11 +153,11 @@ object KafkaController extends Logging {
}
}
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection: ZkConnection, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
- val controllerContext = new ControllerContext(zkClient, zkConnection, config.zkSessionTimeoutMs)
+ val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
@@ -321,7 +320,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
//read controller epoch from zk
readControllerEpochFromZookeeper()
// increment the controller epoch
- incrementControllerEpoch(zkClient)
+ incrementControllerEpoch(zkUtils.zkClient)
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
@@ -613,7 +612,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
reassignedReplicas.toSet)
reassignedPartitionContext.isrChangeListener = isrChangeListener
// register listener on the leader and isr path to wait until they catch up with the current leader
- zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
+ zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
}
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@@ -703,7 +702,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
def incrementControllerEpoch(zkClient: ZkClient) = {
try {
var newControllerEpoch = controllerContext.epoch + 1
- val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPathIfExists(zkClient,
+ val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists(
ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
if(!updateSucceeded)
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
@@ -732,14 +731,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
}
private def registerSessionExpirationListener() = {
- zkClient.subscribeStateChanges(new SessionExpirationListener())
+ zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
}
private def initializeControllerContext() {
// update controller cache with delete topic information
- controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
- controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
- controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
+ controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet
+ controllerContext.allTopics = zkUtils.getAllTopics().toSet
+ controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// update the leader and isr cache for all existing partitions from Zookeeper
@@ -756,7 +755,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
private def initializePreferredReplicaElection() {
// initialize preferred replica election state
- val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
+ val partitionsUndergoingPreferredReplicaElection = zkUtils.getPartitionsUndergoingPreferredReplicaElection()
// check if they are already completed or topic was deleted
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
@@ -774,7 +773,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
private def initializePartitionReassignment() {
// read the partitions being reassigned from zookeeper path /admin/reassign_partitions
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
// check if they are already completed or topic was deleted
val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
@@ -793,7 +792,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
}
private def initializeTopicDeletion() {
- val topicsQueuedForDeletion = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.DeleteTopicsPath).toSet
+ val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
@@ -822,13 +821,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
}
def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
- val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions)
+ val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions)
for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
}
private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
- getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+ zkUtils.getLeaderAndIsrForPartition(topic, partition) match {
case Some(leaderAndIsr) =>
val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r))
replicasNotInIsr.isEmpty
@@ -930,42 +929,42 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
private def registerIsrChangeNotificationListener() = {
debug("Registering IsrChangeNotificationListener")
- zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
+ zkUtils.zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
}
private def deregisterIsrChangeNotificationListener() = {
debug("De-registering IsrChangeNotificationListener")
- zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
+ zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
}
private def registerReassignedPartitionsListener() = {
- zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
+ zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
private def deregisterReassignedPartitionsListener() = {
- zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
+ zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
private def registerPreferredReplicaElectionListener() {
- zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+ zkUtils.zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
}
private def deregisterPreferredReplicaElectionListener() {
- zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+ zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
}
private def deregisterReassignedPartitionsIsrChangeListeners() {
controllerContext.partitionsBeingReassigned.foreach {
case (topicAndPartition, reassignedPartitionsContext) =>
- val zkPartitionPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
- zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
+ val zkPartitionPath = getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
+ zkUtils.zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
}
}
private def readControllerEpochFromZookeeper() {
// initialize the controller epoch and zk version by reading from zookeeper
- if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) {
- val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath)
+ if(controllerContext.zkUtils.pathExists(ZkUtils.ControllerEpochPath)) {
+ val epochData = controllerContext.zkUtils.readData(ZkUtils.ControllerEpochPath)
controllerContext.epoch = epochData._1.toInt
controllerContext.epochZkVersion = epochData._2.getVersion
info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
@@ -975,15 +974,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
// stop watching the ISR changes for this partition
- zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+ zkUtils.zkClient.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
}
// read the current list of reassigned partitions from zookeeper
- val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
+ val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
// remove this partition from that list
val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
// write the new list to zookeeper
- ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
+ zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
// update the cache. NO-OP if the partition's reassignment was never started
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
}
@@ -991,9 +990,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
try {
- val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic)
- val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
- ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
+ val zkPath = getTopicPath(topicAndPartition.topic)
+ val jsonPartitionMap = zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
+ zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
@@ -1014,7 +1013,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
}
}
if (!isTriggeredByAutoRebalance)
- ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+ zkUtils.deletePath(ZkUtils.PreferredReplicaLeaderElectionPath)
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
}
@@ -1057,7 +1056,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
// refresh leader and isr from zookeeper again
- val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+ val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
@@ -1074,7 +1073,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
// if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
// is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
// eventually be restored as the leader.
- if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkClient,
+ if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkUtils,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
newIsr = leaderAndIsr.isr
@@ -1083,7 +1082,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1,
newIsr, leaderAndIsr.zkVersion + 1)
// update the new leadership decision in zookeeper or retry
- val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition,
+ val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
newLeaderAndIsr.zkVersion = newVersion
@@ -1120,7 +1119,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
// refresh leader and isr from zookeeper again
- val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+ val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
case Some(leaderIsrAndEpoch) =>
val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
@@ -1134,7 +1133,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
leaderAndIsr.isr, leaderAndIsr.zkVersion + 1)
// update the new leadership decision in zookeeper or retry
- val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic,
+ val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic,
partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
newLeaderAndIsr.zkVersion = newVersion
@@ -1245,7 +1244,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection
*/
class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
- val zkClient = controller.controllerContext.zkClient
+ val zkUtils = controller.controllerContext.zkUtils
val controllerContext = controller.controllerContext
/**
@@ -1256,7 +1255,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
def handleDataChange(dataPath: String, data: Object) {
debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
.format(dataPath, data))
- val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
+ val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString)
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
}
@@ -1288,7 +1287,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
reassignedReplicas: Set[Int])
extends IZkDataListener with Logging {
this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: "
- val zkClient = controller.controllerContext.zkClient
+ val zkUtils = controller.controllerContext.zkUtils
val controllerContext = controller.controllerContext
/**
@@ -1305,7 +1304,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
case Some(reassignedPartitionContext) =>
// need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
- val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+ val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
newLeaderAndIsrOpt match {
case Some(leaderAndIsr) => // check if new replicas have joined ISR
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
@@ -1359,7 +1358,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
processUpdateNotifications(topicAndPartitions)
} finally {
// delete processed children
- childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient,
+ childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(
ZkUtils.IsrChangeNotificationPath + "/" + x))
}
}
@@ -1373,7 +1372,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
- val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode)
+ val (jsonOpt, stat) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
if (jsonOpt.isDefined) {
val json = Json.parseFull(jsonOpt.get)
@@ -1410,7 +1409,7 @@ object IsrChangeNotificationListener {
*/
class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: "
- val zkClient = controller.controllerContext.zkClient
+ val zkUtils = controller.controllerContext.zkUtils
val controllerContext = controller.controllerContext
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 4ebeb5a..5eed382 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
case true =>
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election.
- if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkClient,
+ if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 675a807..73b173e 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -22,7 +22,8 @@ import collection.mutable.Buffer
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
-import kafka.utils.{Logging, ZkUtils, ReplicationUtils}
+import kafka.utils.{Logging, ReplicationUtils}
+import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.controller.Callbacks.CallbackBuilder
@@ -43,7 +44,7 @@ import kafka.utils.CoreUtils._
class PartitionStateMachine(controller: KafkaController) extends Logging {
private val controllerContext = controller.controllerContext
private val controllerId = controller.config.brokerId
- private val zkClient = controllerContext.zkClient
+ private val zkUtils = controllerContext.zkUtils
private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false)
@@ -83,7 +84,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
deregisterTopicChangeListener()
addPartitionsListener.foreach {
case (topic, listener) =>
- zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener)
+ zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
}
addPartitionsListener.clear()
if(controller.config.deleteTopicEnable)
@@ -289,9 +290,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
controller.epoch)
debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
try {
- ZkUtils.createPersistentPath(controllerContext.zkClient,
- ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
- ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
+ zkUtils.createPersistentPath(
+ getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+ zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
// NOTE: the above write can fail only if the current controller lost its zk session and the new controller
// took over and initialized this partition. This can happen if the current controller went into a long
// GC pause
@@ -301,7 +302,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
} catch {
case e: ZkNodeExistsException =>
// read the controller epoch
- val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
+ val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
topicAndPartition.partition).get
val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
"exists with value %s and controller epoch %d")
@@ -342,7 +343,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
// elect new leader or throw exception
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
- val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition,
+ val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
newLeaderAndIsr = leaderAndIsr
newLeaderAndIsr.zkVersion = newVersion
@@ -370,34 +371,34 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
private def registerTopicChangeListener() = {
- zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
+ zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
}
private def deregisterTopicChangeListener() = {
- zkClient.unsubscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
+ zkUtils.zkClient.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener)
}
def registerPartitionChangeListener(topic: String) = {
addPartitionsListener.put(topic, new AddPartitionsListener(topic))
- zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
+ zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic))
}
def deregisterPartitionChangeListener(topic: String) = {
- zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
+ zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic))
addPartitionsListener.remove(topic)
}
private def registerDeleteTopicListener() = {
- zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
+ zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
private def deregisterDeleteTopicListener() = {
- zkClient.unsubscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
+ zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
val topicAndPartition = TopicAndPartition(topic, partition)
- ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
+ ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) match {
case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
case None =>
val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state"
@@ -426,7 +427,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren
- val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
+ val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
@@ -449,7 +450,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
*/
class DeleteTopicsListener() extends IZkChildListener with Logging {
this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
- val zkClient = controllerContext.zkClient
+ val zkUtils = controllerContext.zkUtils
/**
* Invoked when a topic is being deleted
@@ -466,7 +467,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
if(nonExistentTopics.size > 0) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
- nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic)))
+ nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
}
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
@@ -505,7 +506,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
inLock(controllerContext.controllerLock) {
try {
info("Add Partition triggered " + data.toString + " for path " + dataPath)
- val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+ val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index acad83a..32ed288 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -47,7 +47,7 @@ import kafka.utils.CoreUtils._
class ReplicaStateMachine(controller: KafkaController) extends Logging {
private val controllerContext = controller.controllerContext
private val controllerId = controller.config.brokerId
- private val zkClient = controllerContext.zkClient
+ private val zkUtils = controllerContext.zkUtils
private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
private val brokerChangeListener = new BrokerChangeListener()
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
@@ -171,7 +171,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case NewReplica =>
assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
// start replica as a follower to the current leader for its partition
- val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+ val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
@@ -313,11 +313,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
private def registerBrokerChangeListener() = {
- zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+ zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
}
private def deregisterBrokerChangeListener() = {
- zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+ zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
}
/**
@@ -359,10 +359,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
try {
val curBrokerIds = currentBrokerList.map(_.toInt).toSet
val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
- val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _))
+ val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
- controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+ controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
.format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 9e39dd5..c6f80ac 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -23,8 +23,9 @@ import org.apache.kafka.common.requests.{StopReplicaResponse, AbstractRequestRes
import collection.mutable
import collection.JavaConverters._
-import kafka.utils.{ShutdownableThread, Logging, ZkUtils}
+import kafka.utils.{ShutdownableThread, Logging}
import kafka.utils.CoreUtils._
+import kafka.utils.ZkUtils._
import collection.Set
import kafka.common.TopicAndPartition
import java.util.concurrent.locks.ReentrantLock
@@ -288,9 +289,10 @@ class TopicDeletionManager(controller: KafkaController,
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
- controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
- controllerContext.zkClient.deleteRecursive(ZkUtils.getEntityConfigPath(ConfigType.Topic, topic))
- controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
+ val zkUtils = controllerContext.zkUtils
+ zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
+ zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
+ zkUtils.zkClient.delete(getDeleteTopicPath(topic))
controllerContext.removeTopic(topic)
}
@@ -385,7 +387,7 @@ class TopicDeletionManager(controller: KafkaController,
}
class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
- val zkClient = controllerContext.zkClient
+ val zkUtils = controllerContext.zkUtils
override def doWork() {
awaitTopicDeletionNotification()
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 68ff4fc..bf23e9b 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -43,7 +43,7 @@ class ConsumerCoordinator(val brokerId: Int,
val groupConfig: GroupManagerConfig,
val offsetConfig: OffsetManagerConfig,
private val offsetManager: OffsetManager,
- zkClient: ZkClient) extends Logging {
+ zkUtils: ZkUtils) extends Logging {
this.logIdent = "[ConsumerCoordinator " + brokerId + "]: "
@@ -57,9 +57,9 @@ class ConsumerCoordinator(val brokerId: Int,
groupConfig: GroupManagerConfig,
offsetConfig: OffsetManagerConfig,
replicaManager: ReplicaManager,
- zkClient: ZkClient,
+ zkUtils: ZkUtils,
scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig,
- new OffsetManager(offsetConfig, replicaManager, zkClient, scheduler), zkClient)
+ new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler), zkUtils)
def offsetsTopicConfigs: Properties = {
val props = new Properties
@@ -81,7 +81,7 @@ class ConsumerCoordinator(val brokerId: Int,
info("Starting up.")
heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId)
- coordinatorMetadata = new CoordinatorMetadata(brokerId, zkClient, maybePrepareRebalance)
+ coordinatorMetadata = new CoordinatorMetadata(brokerId, zkUtils, maybePrepareRebalance)
isActive.set(true)
info("Startup complete.")
}
@@ -499,7 +499,7 @@ object ConsumerCoordinator {
val OffsetsTopicName = "__consumer_offsets"
def create(config: KafkaConfig,
- zkClient: ZkClient,
+ zkUtils: ZkUtils,
replicaManager: ReplicaManager,
kafkaScheduler: KafkaScheduler): ConsumerCoordinator = {
val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
@@ -513,11 +513,11 @@ object ConsumerCoordinator {
val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
- new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkClient, kafkaScheduler)
+ new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler)
}
def create(config: KafkaConfig,
- zkClient: ZkClient,
+ zkUtils: ZkUtils,
offsetManager: OffsetManager): ConsumerCoordinator = {
val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
@@ -530,6 +530,6 @@ object ConsumerCoordinator {
val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
- new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkClient)
+ new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkUtils)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
index 2920320..a33231a 100644
--- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
@@ -20,7 +20,7 @@ package kafka.coordinator
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils.{threadsafe, ZkUtils, Logging}
-
+import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.{ZkClient, IZkDataListener}
import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -33,7 +33,7 @@ import scala.collection.mutable
*/
@threadsafe
private[coordinator] class CoordinatorMetadata(brokerId: Int,
- zkClient: ZkClient,
+ zkUtils: ZkUtils,
maybePrepareRebalance: ConsumerGroupMetadata => Unit) {
/**
@@ -159,19 +159,19 @@ private[coordinator] class CoordinatorMetadata(brokerId: Int,
}
private def getTopicPartitionCountFromZK(topic: String) = {
- val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic))
+ val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
topicData(topic).size
}
private def registerTopicPartitionChangeListener(topic: String) {
val listener = new TopicPartitionChangeListener
topicPartitionChangeListeners.put(topic, listener)
- zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), listener)
+ zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), listener)
}
private def deregisterTopicPartitionChangeListener(topic: String) {
val listener = topicPartitionChangeListeners(topic)
- zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener)
+ zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
topicPartitionChangeListeners.remove(topic)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 2e5ee8d..9b4314e 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -27,6 +27,7 @@ import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import scala.collection.JavaConverters._
import org.apache.log4j.Logger
@@ -59,7 +60,7 @@ object SimpleAclAuthorizer {
//notification node which gets updated with the resource name when acl on a resource is changed.
val AclChangedZkPath = "/kafka-acl-changes"
- //prefix of all the change notificiation sequence node.
+ //prefix of all the change notification sequence node.
val AclChangedPrefix = "acl_changes_"
}
@@ -67,7 +68,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger")
private var superUsers = Set.empty[KafkaPrincipal]
private var shouldAllowEveryoneIfNoAclIsFound = false
- private var zkClient: ZkClient = null
+ private var zkUtils: ZkUtils = null
private var aclChangeListener: ZkNodeChangeNotificationListener = null
private val aclCache = new scala.collection.mutable.HashMap[Resource, Set[Acl]]
@@ -92,16 +93,19 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val zkConnectionTimeoutMs = configs.getOrElse(SimpleAclAuthorizer.ZkConnectionTimeOutProp, kafkaConfig.zkConnectionTimeoutMs).toString.toInt
val zkSessionTimeOutMs = configs.getOrElse(SimpleAclAuthorizer.ZkSessionTimeOutProp, kafkaConfig.zkSessionTimeoutMs).toString.toInt
- zkClient = ZkUtils.createZkClient(zkUrl, zkConnectionTimeoutMs, zkSessionTimeOutMs)
- ZkUtils.makeSurePersistentPathExists(zkClient, SimpleAclAuthorizer.AclZkPath)
+ zkUtils = ZkUtils(zkUrl,
+ zkConnectionTimeoutMs,
+ zkSessionTimeOutMs,
+ JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+ zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
loadCache()
- ZkUtils.makeSurePersistentPathExists(zkClient, SimpleAclAuthorizer.AclChangedZkPath)
- aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler)
+ zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
+ aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler)
aclChangeListener.init()
- zkClient.subscribeStateChanges(ZkStateChangeListener)
+ zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
}
override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
@@ -162,17 +166,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
val updatedAcls = getAcls(resource) ++ acls
val path = toResourcePath(resource)
- if (ZkUtils.pathExists(zkClient, path))
- ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
+ if (zkUtils.pathExists(path))
+ zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
else
- ZkUtils.createPersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
+ zkUtils.createPersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls)))
updateAclChangedFlag(resource)
}
}
override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = {
- if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) {
+ if (zkUtils.pathExists(toResourcePath(resource))) {
val existingAcls = getAcls(resource)
val filteredAcls = existingAcls.filter((acl: Acl) => !aclsTobeRemoved.contains(acl))
@@ -180,9 +184,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
if (aclNeedsRemoval) {
val path: String = toResourcePath(resource)
if (filteredAcls.nonEmpty)
- ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls)))
+ zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls)))
else
- ZkUtils.deletePath(zkClient, toResourcePath(resource))
+ zkUtils.deletePath(toResourcePath(resource))
updateAclChangedFlag(resource)
}
@@ -192,8 +196,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}
override def removeAcls(resource: Resource): Boolean = {
- if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) {
- ZkUtils.deletePath(zkClient, toResourcePath(resource))
+ if (zkUtils.pathExists(toResourcePath(resource))) {
+ zkUtils.deletePath(toResourcePath(resource))
updateAclChangedFlag(resource)
true
} else false
@@ -206,7 +210,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}
private def getAclsFromZk(resource: Resource): Set[Acl] = {
- val aclJson = ZkUtils.readDataMaybeNull(zkClient, toResourcePath(resource))._1
+ val aclJson = zkUtils.readDataMaybeNull(toResourcePath(resource))._1
aclJson.map(Acl.fromJson).getOrElse(Set.empty)
}
@@ -224,11 +228,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
private def loadCache() {
var acls = Set.empty[Acl]
- val resourceTypes = ZkUtils.getChildren(zkClient, SimpleAclAuthorizer.AclZkPath)
+ val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
for (rType <- resourceTypes) {
val resourceType = ResourceType.fromString(rType)
val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
- val resourceNames = ZkUtils.getChildren(zkClient, resourceTypePath)
+ val resourceNames = zkUtils.getChildren(resourceTypePath)
for (resourceName <- resourceNames) {
acls = getAclsFromZk(Resource(resourceType, resourceName.toString))
updateCache(new Resource(resourceType, resourceName), acls)
@@ -255,7 +259,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
}
private def updateAclChangedFlag(resource: Resource) {
- ZkUtils.createSequentialPersistentPath(zkClient, SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
+ zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString)
}
object AclChangedNotificaitonHandler extends NotificationHandler {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 4da1833..d443a1f 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -70,7 +70,7 @@ object ConfigType {
* on startup where a change might be missed between the initial config load and registering for change notifications.
*
*/
-class DynamicConfigManager(private val zkClient: ZkClient,
+class DynamicConfigManager(private val zkUtils: ZkUtils,
private val configHandler : Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = SystemTime) extends Logging {
@@ -80,8 +80,8 @@ class DynamicConfigManager(private val zkClient: ZkClient,
* Begin watching for config changes
*/
def startup() {
- ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.EntityConfigChangesPath)
- zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
+ zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)
+ zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)
processAllConfigChanges()
}
@@ -89,7 +89,7 @@ class DynamicConfigManager(private val zkClient: ZkClient,
* Process all config changes
*/
private def processAllConfigChanges() {
- val configChanges = zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
+ val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath)
import JavaConversions._
processConfigChanges((configChanges: mutable.Buffer[String]).sorted)
}
@@ -107,7 +107,7 @@ class DynamicConfigManager(private val zkClient: ZkClient,
if (changeId > lastExecutedChange) {
val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
- val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode)
+ val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)
processNotification(jsonOpt)
}
lastExecutedChange = changeId
@@ -138,7 +138,7 @@ class DynamicConfigManager(private val zkClient: ZkClient,
case Some(value: String) => value
case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json)
}
- configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkClient, entityType, entity))
+ configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkUtils, entityType, entity))
case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
"{\"version\" : 1," +
@@ -151,12 +151,12 @@ class DynamicConfigManager(private val zkClient: ZkClient,
private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) {
for(notification <- notifications.sorted) {
- val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.EntityConfigChangesPath + "/" + notification)
+ val (jsonOpt, stat) = zkUtils.readDataMaybeNull(ZkUtils.EntityConfigChangesPath + "/" + notification)
if(jsonOpt.isDefined) {
val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification
if (now - stat.getCtime > changeExpirationMs) {
debug("Purging config change notification " + notification)
- ZkUtils.deletePath(zkClient, changeZnode)
+ zkUtils.deletePath(changeZnode)
} else {
return
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5715626..6acab8d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -44,7 +44,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val coordinator: ConsumerCoordinator,
val controller: KafkaController,
- val zkClient: ZkClient,
+ val zkUtils: ZkUtils,
val brokerId: Int,
val config: KafkaConfig,
val metadataCache: MetadataCache,
@@ -221,7 +221,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} else if (metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
(topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
} else {
- ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
+ zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" +
topicAndPartition.partition, metaAndError.offset.toString)
(topicAndPartition, ErrorMapping.NoError)
}
@@ -535,14 +535,14 @@ class KafkaApis(val requestChannel: RequestChannel,
Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
else
config.offsetsTopicReplicationFactor.toInt
- AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
+ AdminUtils.createTopic(zkUtils, topic, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor,
coordinator.offsetsTopicConfigs)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor))
}
else {
- AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+ AdminUtils.createTopic(zkUtils, topic, config.numPartitions, config.defaultReplicationFactor)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topic, config.numPartitions, config.defaultReplicationFactor))
}
@@ -624,7 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) {
(topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
} else {
- val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1
+ val payloadOpt = zkUtils.readDataMaybeNull(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1
payloadOpt match {
case Some(payload) => (topicAndPartition, OffsetMetadataAndError(payload.toLong))
case None => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 16760d4..928ff43 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -35,14 +35,13 @@ import java.net.InetAddress
*/
class KafkaHealthcheck(private val brokerId: Int,
private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
- private val zkClient: ZkClient,
- private val zkConnection: ZkConnection) extends Logging {
+ private val zkUtils: ZkUtils) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener
def startup() {
- zkClient.subscribeStateChanges(sessionExpireListener)
+ zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)
register()
}
@@ -62,7 +61,7 @@ class KafkaHealthcheck(private val brokerId: Int,
// only PLAINTEXT is supported as default
// if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
- ZkUtils.registerBrokerInZk(zkClient, zkConnection, brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
+ zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 99a3c12..f50c266 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkRece
import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.requests.{ControlledShutdownResponse, ControlledShutdownRequest, RequestSend}
+import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.ssl.SSLFactory
import org.apache.kafka.common.utils.AppInfoParser
@@ -128,8 +129,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
var kafkaHealthcheck: KafkaHealthcheck = null
val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
- var zkClient: ZkClient = null
- var zkConnection: ZkConnection = null
+ var zkUtils: ZkUtils = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
@@ -165,12 +165,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
kafkaScheduler.startup()
/* setup zookeeper */
- val (client, connection) = initZk()
- zkClient = client
- zkConnection = connection
+ zkUtils = initZk()
/* start log manager */
- logManager = createLogManager(zkClient, brokerState)
+ logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()
/* generate brokerId */
@@ -181,16 +179,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
socketServer.startup()
/* start replica manager */
- replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkClient, kafkaScheduler, logManager,
+ replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
isShuttingDown)
replicaManager.startup()
/* start kafka controller */
- kafkaController = new KafkaController(config, zkClient, zkConnection, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
+ kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()
/* start kafka coordinator */
- consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler)
+ consumerCoordinator = ConsumerCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler)
consumerCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
@@ -204,7 +202,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
- kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer)
+ kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
@@ -213,7 +211,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
ConfigType.Client -> new ClientIdConfigHandler)
- dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
+ dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
@@ -223,7 +221,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
else
(protocol, endpoint)
}
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkClient, zkConnection)
+ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
kafkaHealthcheck.startup()
/* register broker metrics */
@@ -245,7 +243,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
}
}
- private def initZk(): (ZkClient, ZkConnection) = {
+ private def initZk(): ZkUtils = {
info("Connecting to zookeeper on " + config.zkConnect)
val chroot = {
@@ -257,15 +255,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
if (chroot.length > 1) {
val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
- val zkClientForChrootCreation = ZkUtils.createZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
- ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
+ val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
+ config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs,
+ JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+ zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
info("Created zookeeper path " + chroot)
- zkClientForChrootCreation.close()
+ zkClientForChrootCreation.zkClient.close()
}
- val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
- ZkUtils.setupCommonPaths(zkClient)
- (zkClient, zkConnection)
+ val zkUtils = ZkUtils(config.zkConnect,
+ config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs,
+ JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
+ zkUtils.setupCommonPaths()
+ zkUtils
}
@@ -334,8 +338,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request
- val controllerId = ZkUtils.getController(zkClient)
- ZkUtils.getBrokerInfo(zkClient, controllerId) match {
+ val controllerId = zkUtils.getController()
+ zkUtils.getBrokerInfo(controllerId) match {
case Some(broker) =>
// if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
// attempt, connect to the most recent controller
@@ -410,8 +414,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request
- val controllerId = ZkUtils.getController(zkClient)
- ZkUtils.getBrokerInfo(zkClient, controllerId) match {
+ val controllerId = zkUtils.getController()
+ zkUtils.getBrokerInfo(controllerId) match {
case Some(broker) =>
if (channel == null || prevController == null || !prevController.equals(broker)) {
// if this is the first attempt or if the controller has changed, create a channel to the most recent
@@ -524,8 +528,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
CoreUtils.swallow(consumerCoordinator.shutdown())
if(kafkaController != null)
CoreUtils.swallow(kafkaController.shutdown())
- if(zkClient != null)
- CoreUtils.swallow(zkClient.close())
+ if(zkUtils != null)
+ CoreUtils.swallow(zkUtils.close())
if (metrics != null)
CoreUtils.swallow(metrics.close())
@@ -559,7 +563,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
- val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
+ val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).mapValues(LogConfig.fromProps(defaultProps, _))
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
@@ -626,7 +630,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
private def generateBrokerId: Int = {
try {
- ZkUtils.getBrokerSequenceId(zkClient, config.maxReservedBrokerId)
+ zkUtils.getBrokerSequenceId(config.maxReservedBrokerId)
} catch {
case e: Exception =>
error("Failed to generate broker.id due to ", e)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 0e613e7..bdc3bb6 100755
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -87,7 +87,7 @@ object OffsetManagerConfig {
class OffsetManager(val config: OffsetManagerConfig,
replicaManager: ReplicaManager,
- zkClient: ZkClient,
+ zkUtils: ZkUtils,
scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
/* offsets and metadata cache */
@@ -449,7 +449,7 @@ class OffsetManager(val config: OffsetManagerConfig,
*/
private def getOffsetsTopicPartitionCount = {
val topic = ConsumerCoordinator.OffsetsTopicName
- val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic))
+ val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
if (topicData(topic).nonEmpty)
topicData(topic).size
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 82a6001..0a17fd0 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -144,7 +144,7 @@ class ReplicaFetcherThread(name: String,
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
- if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkClient,
+ if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c0fec67..1fc47f4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -99,7 +99,7 @@ class ReplicaManager(val config: KafkaConfig,
metrics: Metrics,
time: Time,
jTime: JTime,
- val zkClient: ZkClient,
+ val zkUtils: ZkUtils,
scheduler: Scheduler,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean,
@@ -163,7 +163,7 @@ class ReplicaManager(val config: KafkaConfig,
def maybePropagateIsrChanges() {
isrChangeSet synchronized {
if (isrChangeSet.nonEmpty) {
- ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet)
+ ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
isrChangeSet.clear()
}
}