You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/18 16:15:06 UTC
[4/5] kafka git commit: KAFKA-5642;
Use async ZookeeperClient in Controller
http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 811ff67..d4a012e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,31 +16,28 @@
*/
package kafka.controller
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.yammer.metrics.core.Gauge
-import kafka.admin.{AdminUtils, PreferredReplicaLeaderElectionCommand}
+import kafka.admin.AdminOperationException
import kafka.api._
import kafka.cluster.Broker
import kafka.common._
-import kafka.log.LogConfig
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server._
-import kafka.utils.ZkUtils._
import kafka.utils._
-import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, StopReplicaResponse, LeaderAndIsrResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, LeaderAndIsrResponse, StopReplicaResponse}
import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import scala.collection._
import scala.util.Try
-class ControllerContext(val zkUtils: ZkUtils) {
+class ControllerContext {
val stats = new ControllerStats
var controllerChannelManager: ControllerChannelManager = null
@@ -133,33 +130,13 @@ object KafkaController extends Logging {
val InitialControllerEpoch = 1
val InitialControllerEpochZkVersion = 1
- def parseControllerId(controllerInfoString: String): Int = {
- try {
- Json.parseFull(controllerInfoString) match {
- case Some(js) => js.asJsonObject("brokerid").to[Int]
- case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
- }
- } catch {
- case _: Throwable =>
- // It may be due to an incompatible controller register version
- warn("Failed to parse the controller info as json. "
- + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
- try controllerInfoString.toInt
- catch {
- case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
- }
- }
- }
}
-class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
-
+class KafkaController(val config: KafkaConfig, kafkaControllerZkUtils: KafkaControllerZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = s"[Controller id=${config.brokerId}] "
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
- val controllerContext = new ControllerContext(zkUtils)
- val partitionStateMachine = new PartitionStateMachine(this, stateChangeLogger)
- val replicaStateMachine = new ReplicaStateMachine(this, stateChangeLogger)
+ val controllerContext = new ControllerContext
// have a separate scheduler for the controller to be able to start and stop independently of the kafka server
// visible for testing
@@ -169,21 +146,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
_ => updateMetrics())
- val topicDeletionManager = new TopicDeletionManager(this, eventManager)
- val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
- private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
- private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
- private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
+ val topicDeletionManager = new TopicDeletionManager(this, eventManager, kafkaControllerZkUtils)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
-
- private val brokerChangeListener = new BrokerChangeListener(this, eventManager)
- private val topicChangeListener = new TopicChangeListener(this, eventManager)
- private val topicDeletionListener = new TopicDeletionListener(this, eventManager)
- private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
- private val partitionReassignmentListener = new PartitionReassignmentListener(this, eventManager)
- private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this, eventManager)
- private val isrChangeNotificationListener = new IsrChangeNotificationListener(this, eventManager)
- private val logDirEventNotificationListener = new LogDirEventNotificationListener(this, eventManager)
+ val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, kafkaControllerZkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+ val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, kafkaControllerZkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+
+ private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
+ private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
+ private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
+ private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
+ private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
+ private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager)
+ private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager)
+ private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager)
+ private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager)
@volatile private var activeControllerId = -1
@volatile private var offlinePartitionCount = 0
@@ -270,35 +246,40 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
* This ensures another controller election will be triggered and there will always be an actively serving controller
*/
def onControllerFailover() {
- info("Starting become controller state transition")
+ info("Reading controller epoch from zookeeper")
readControllerEpochFromZookeeper()
+ info("Incrementing controller epoch in zookeeper")
incrementControllerEpoch()
- LogDirUtils.deleteLogDirEvents(zkUtils)
-
+ info("Registering handlers")
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
- registerPartitionReassignmentListener()
- registerIsrChangeNotificationListener()
- registerPreferredReplicaElectionListener()
- registerTopicChangeListener()
- registerTopicDeletionListener()
- registerBrokerChangeListener()
- registerLogDirEventNotificationListener()
-
+ kafkaControllerZkUtils.registerZNodeChildChangeHandler(brokerChangeHandler)
+ kafkaControllerZkUtils.registerZNodeChildChangeHandler(topicChangeHandler)
+ kafkaControllerZkUtils.registerZNodeChildChangeHandler(topicDeletionHandler)
+ kafkaControllerZkUtils.registerZNodeChildChangeHandler(logDirEventNotificationHandler)
+ kafkaControllerZkUtils.registerZNodeChildChangeHandler(isrChangeNotificationHandler)
+ kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
+ kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
+ info("Deleting log dir event notifications")
+ kafkaControllerZkUtils.deleteLogDirEventNotifications()
+ info("Deleting isr change notifications")
+ kafkaControllerZkUtils.deleteIsrChangeNotifications()
+ info("Initializing controller context")
initializeControllerContext()
+ info("Fetching topic deletions in progress")
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
+ info("Initializing topic deletion manager")
topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
// are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
// they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
// partitionStateMachine.startup().
+ info("Sending update metadata request")
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
replicaStateMachine.startup()
partitionStateMachine.startup()
- // register the partition change listeners for all existing topics on failover
- controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic))
info(s"Ready to serve as the new controller with epoch $epoch")
maybeTriggerPartitionReassignment()
topicDeletionManager.tryTopicDeletion()
@@ -323,10 +304,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
def onControllerResignation() {
debug("Resigning")
// de-register listeners
- deregisterIsrChangeNotificationListener()
- deregisterPartitionReassignmentListener()
- deregisterPreferredReplicaElectionListener()
- deregisterLogDirEventNotificationListener()
+ kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
+ kafkaControllerZkUtils.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
+ kafkaControllerZkUtils.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
+ kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
// reset topic deletion manager
topicDeletionManager.reset()
@@ -339,15 +320,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
globalPartitionCount = 0
// de-register partition ISR listener for on-going partition reassignment task
- deregisterPartitionReassignmentIsrChangeListeners()
+ unregisterPartitionReassignmentIsrChangeHandlers()
// shutdown partition state machine
partitionStateMachine.shutdown()
- deregisterTopicChangeListener()
- partitionModificationsListeners.keys.foreach(deregisterPartitionModificationsListener)
- deregisterTopicDeletionListener()
+ kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
+ unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
+ kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
// shutdown replica state machine
replicaStateMachine.shutdown()
- deregisterBrokerChangeListener()
+ kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
resetControllerContext()
@@ -367,7 +348,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
def onBrokerLogDirFailure(brokerIds: Seq[Int]) {
// send LeaderAndIsrRequest for all replicas on those brokers to see if they are still online.
val replicasOnBrokers = controllerContext.replicasOnBrokers(brokerIds.toSet)
- replicaStateMachine.handleStateChanges(replicasOnBrokers, OnlineReplica)
+ replicaStateMachine.handleStateChanges(replicasOnBrokers.toSeq, OnlineReplica)
}
/**
@@ -396,7 +377,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
// the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
// supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
- replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
+ replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
// when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
// to see if these brokers can become leaders for some/all of those
partitionStateMachine.triggerOnlinePartitionStateChange()
@@ -450,11 +431,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
!topicDeletionManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
// trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
- partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
+ partitionStateMachine.handleStateChanges(partitionsWithoutLeader.toSeq, OfflinePartition)
// trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange()
// trigger OfflineReplica state change for those newly offline replicas
- replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion, OfflineReplica)
+ replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
// fail deletion of topics that affected by the offline replicas
if (newOfflineReplicasForDeletion.nonEmpty) {
@@ -472,20 +453,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
/**
- * This callback is invoked by the partition state machine's topic change listener with the list of new topics
- * and partitions as input. It does the following -
- * 1. Registers partition change listener. This is not required until KAFKA-347
- * 2. Invokes the new partition callback
- * 3. Send metadata request with the new topic to all brokers so they allow requests for that topic to be served
- */
- def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
- info("New topic creation callback for %s".format(newPartitions.mkString(",")))
- // subscribe to partition changes
- topics.foreach(topic => registerPartitionModificationsListener(topic))
- onNewPartitionCreation(newPartitions)
- }
-
- /**
* This callback is invoked by the topic change callback with the list of failed brokers as input.
* It does the following -
* 1. Move the newly created partitions to the NewPartition state
@@ -493,10 +460,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
*/
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
info("New partition creation callback for %s".format(newPartitions.mkString(",")))
- partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
- replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
- partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
- replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
+ partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
+ replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
+ partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+ replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
}
/**
@@ -542,7 +509,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
*/
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
- if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
+ if (!areReplicasInIsr(topicAndPartition, reassignedReplicas)) {
info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned not yet caught up with the leader")
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
@@ -561,7 +528,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
//5. replicas in RAR -> OnlineReplica
reassignedReplicas.foreach { replica =>
- replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
+ replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
replica)), OnlineReplica)
}
//6. Set AR to RAR in memory.
@@ -584,15 +551,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
}
- private def watchIsrChangesForReassignedPartition(topic: String,
- partition: Int,
+ private def watchIsrChangesForReassignedPartition(partition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
- val reassignedReplicas = reassignedPartitionContext.newReplicas
- val isrChangeListener = new PartitionReassignmentIsrChangeListener(this, eventManager, topic, partition,
- reassignedReplicas.toSet)
- reassignedPartitionContext.isrChangeListener = isrChangeListener
+ val partitionReassignmentIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition)
+ reassignedPartitionContext.partitionReassignmentIsrChangeHandler = partitionReassignmentIsrChangeHandler
// register listener on the leader and isr path to wait until they catch up with the current leader
- zkUtils.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
+ kafkaControllerZkUtils.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
}
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@@ -610,7 +574,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
} else {
info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
// first register ISR change listener
- watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
+ watchIsrChangesForReassignedPartition(topicAndPartition, reassignedPartitionContext)
controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
// mark topic ineligible for deletion for the partitions being reassigned
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
@@ -629,7 +593,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
- partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+ partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
} catch {
case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
@@ -663,49 +627,38 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
def incrementControllerEpoch() = {
- try {
- val newControllerEpoch = controllerContext.epoch + 1
- val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists(
- ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
- if(!updateSucceeded)
+ val newControllerEpoch = controllerContext.epoch + 1
+ val setDataResponse = kafkaControllerZkUtils.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
+ if (Code.get(setDataResponse.rc) == Code.OK) {
+ controllerContext.epochZkVersion = setDataResponse.stat.getVersion
+ controllerContext.epoch = newControllerEpoch
+ } else if (Code.get(setDataResponse.rc) == Code.NONODE) {
+ // if path doesn't exist, this is the first controller whose epoch should be 1
+ // the following call can still fail if another controller gets elected between checking if the path exists and
+ // trying to create the controller epoch path
+ val createResponse = kafkaControllerZkUtils.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
+ if (Code.get(createResponse.rc) == Code.OK) {
+ controllerContext.epoch = KafkaController.InitialControllerEpoch
+ controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
+ } else if (Code.get(createResponse.rc) == Code.NODEEXISTS) {
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
- else {
- controllerContext.epochZkVersion = newVersion
- controllerContext.epoch = newControllerEpoch
+ } else {
+ val exception = KeeperException.create(Code.get(createResponse.rc))
+ error("Error while incrementing controller epoch", exception)
+ throw exception
}
- } catch {
- case _: ZkNoNodeException =>
- // if path doesn't exist, this is the first controller whose epoch should be 1
- // the following call can still fail if another controller gets elected between checking if the path exists and
- // trying to create the controller epoch path
- try {
- zkUtils.createPersistentPath(ZkUtils.ControllerEpochPath, KafkaController.InitialControllerEpoch.toString)
- controllerContext.epoch = KafkaController.InitialControllerEpoch
- controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
- } catch {
- case _: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
- "Aborting controller startup procedure")
- case oe: Throwable => error("Error while incrementing controller epoch", oe)
- }
- case oe: Throwable => error("Error while incrementing controller epoch", oe)
-
+ } else {
+ throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
}
- info(s"Incremented epoch to ${controllerContext.epoch}")
- }
-
- private def registerSessionExpirationListener() = {
- zkUtils.subscribeStateChanges(new SessionExpirationListener(this, eventManager))
- }
-
- private def registerControllerChangeListener() = {
- zkUtils.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager))
+ info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
}
private def initializeControllerContext() {
// update controller cache with delete topic information
- controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet
- controllerContext.allTopics = zkUtils.getAllTopics().toSet
- controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
+ controllerContext.liveBrokers = kafkaControllerZkUtils.getAllBrokersInCluster.toSet
+ controllerContext.allTopics = kafkaControllerZkUtils.getAllTopicsInCluster.toSet
+ registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
+ controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ kafkaControllerZkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// update the leader and isr cache for all existing partitions from Zookeeper
@@ -719,7 +672,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = {
- val partitionsUndergoingPreferredReplicaElection = zkUtils.getPartitionsUndergoingPreferredReplicaElection()
+ val partitionsUndergoingPreferredReplicaElection = kafkaControllerZkUtils.getPreferredReplicaElection
// check if they are already completed or topic was deleted
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
@@ -755,7 +708,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
private def initializePartitionReassignment() {
// read the partitions being reassigned from zookeeper path /admin/reassign_partitions
- val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
+ val partitionsBeingReassigned = kafkaControllerZkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
// check if they are already completed or topic was deleted
val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
@@ -774,7 +727,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
- val topicsToBeDeleted = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
+ val topicsToBeDeleted = kafkaControllerZkUtils.getTopicDeletions.toSet
val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) =>
replicas.exists(r => !controllerContext.isReplicaOnline(r, partition))
}.keySet.map(_.topic)
@@ -797,15 +750,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
controllerContext.controllerChannelManager.startup()
}
- def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
- val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(topicAndPartitions)
- for ((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
- controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
+ def updateLeaderAndIsrCache(partitions: Seq[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
+ val leaderIsrAndControllerEpochs = kafkaControllerZkUtils.getTopicPartitionStates(partitions)
+ leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
+ controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+ }
}
- private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
- zkUtils.getLeaderAndIsrForPartition(topic, partition).exists { leaderAndIsr =>
- replicas.forall(leaderAndIsr.isr.contains)
+ private def areReplicasInIsr(partition: TopicAndPartition, replicas: Seq[Int]): Boolean = {
+ kafkaControllerZkUtils.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
+ replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
}
}
@@ -821,7 +775,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
// move the leader to one of the alive and caught up new replicas
- partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
+ partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
} else {
// check if the leader is alive or not
if (controllerContext.isReplicaOnline(currentLeader, topicAndPartition)) {
@@ -832,7 +786,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
} else {
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
- partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
+ partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
}
}
}
@@ -844,22 +798,28 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
val partition = topicAndPartition.partition
// first move the replica to offline state (the controller removes it from the ISR)
val replicasToBeDeleted = oldReplicas.map(r => PartitionAndReplica(topic, partition, r))
- replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica)
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, OfflineReplica)
// send stop replica command to the old replicas
- replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted)
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionStarted)
// TODO: Eventually partition reassignment could use a callback that does retries if deletion failed
- replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful)
- replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica)
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionSuccessful)
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, NonExistentReplica)
}
- private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
+ private def updateAssignedReplicasForPartition(partition: TopicAndPartition,
replicas: Seq[Int]) {
- val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
- partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas)
- updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
- info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(",")))
- // update the assigned replica list after a successful zookeeper write
- controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas)
+ val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(partition.topic))
+ partitionsAndReplicasForThisTopic.put(partition, replicas)
+ val setDataResponse = kafkaControllerZkUtils.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
+ if (Code.get(setDataResponse.rc) == Code.OK) {
+ info("Updated assigned replicas for partition %s being reassigned to %s ".format(partition, replicas.mkString(",")))
+ // update the assigned replica list after a successful zookeeper write
+ controllerContext.partitionReplicaAssignment.put(partition, replicas)
+ } else if (Code.get(setDataResponse.rc) == Code.NONODE) {
+ throw new IllegalStateException("Topic %s doesn't exist".format(partition.topic))
+ } else {
+ throw new KafkaException(KeeperException.create(Code.get(setDataResponse.rc)))
+ }
}
private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
@@ -868,18 +828,18 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
// send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
// replicas list
newReplicas.foreach { replica =>
- replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
+ replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
}
}
- private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
+ private def updateLeaderEpochAndSendRequest(partition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
- updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
+ updateLeaderEpoch(partition) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
try {
brokerRequestBatch.newBatch()
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
- topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, partition.topic,
+ partition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew = false)
brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: IllegalStateException =>
@@ -887,97 +847,43 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with new assigned replica " +
s"list ${newAssignedReplicas.mkString(",")} to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
- s"for partition being reassigned $topicAndPartition")
+ s"for partition being reassigned $partition")
case None => // fail the reassignment
stateChangeLog.error("Failed to send LeaderAndIsr request with new assigned replica list " +
- s"${newAssignedReplicas.mkString( ",")} to leader for partition being reassigned $topicAndPartition")
+ s"${newAssignedReplicas.mkString( ",")} to leader for partition being reassigned $partition")
}
}
- private def registerBrokerChangeListener() = {
- zkUtils.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
- }
-
- private def deregisterBrokerChangeListener() = {
- zkUtils.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
- }
-
- private def registerTopicChangeListener() = {
- zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
- }
-
- private def deregisterTopicChangeListener() = {
- zkUtils.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener)
- }
-
- def registerPartitionModificationsListener(topic: String) = {
- partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, eventManager, topic))
- zkUtils.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
- }
-
- def deregisterPartitionModificationsListener(topic: String) = {
- zkUtils.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
- partitionModificationsListeners.remove(topic)
- }
-
- private def registerTopicDeletionListener() = {
- zkUtils.subscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
- }
-
- private def deregisterTopicDeletionListener() = {
- zkUtils.unsubscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
- }
-
- private def registerPartitionReassignmentListener() = {
- zkUtils.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
- }
-
- private def deregisterPartitionReassignmentListener() = {
- zkUtils.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
- }
-
- private def registerIsrChangeNotificationListener() = {
- debug("Registering IsrChangeNotificationListener")
- zkUtils.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
- }
-
- private def deregisterIsrChangeNotificationListener() = {
- debug("De-registering IsrChangeNotificationListener")
- zkUtils.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
- }
-
- private def registerPreferredReplicaElectionListener() {
- zkUtils.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+ private def registerPartitionModificationsHandlers(topics: Seq[String]) = {
+ topics.foreach { topic =>
+ val partitionModificationsHandler = new PartitionModificationsHandler(this, eventManager, topic)
+ partitionModificationsHandlers.put(topic, partitionModificationsHandler)
+ }
+ partitionModificationsHandlers.values.foreach(kafkaControllerZkUtils.registerZNodeChangeHandler)
}
- private def deregisterPreferredReplicaElectionListener() {
- zkUtils.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+ def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
+ topics.foreach { topic =>
+ partitionModificationsHandlers.remove(topic)
+ .foreach(handler => kafkaControllerZkUtils.unregisterZNodeChangeHandler(handler.path))
+ }
}
- private def deregisterPartitionReassignmentIsrChangeListeners() {
+ private def unregisterPartitionReassignmentIsrChangeHandlers() {
controllerContext.partitionsBeingReassigned.foreach {
case (topicAndPartition, reassignedPartitionsContext) =>
- val zkPartitionPath = getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
- zkUtils.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
+ val partitionReassignmentIsrChangeHandler =
+ reassignedPartitionsContext.partitionReassignmentIsrChangeHandler
+ kafkaControllerZkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
}
}
- private def registerLogDirEventNotificationListener() = {
- debug("Registering logDirEventNotificationListener")
- zkUtils.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
- }
-
- private def deregisterLogDirEventNotificationListener() = {
- debug("De-registering logDirEventNotificationListener")
- zkUtils.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
- }
-
private def readControllerEpochFromZookeeper() {
// initialize the controller epoch and zk version by reading from zookeeper
- if(controllerContext.zkUtils.pathExists(ZkUtils.ControllerEpochPath)) {
- val epochData = controllerContext.zkUtils.readData(ZkUtils.ControllerEpochPath)
- controllerContext.epoch = epochData._1.toInt
- controllerContext.epochZkVersion = epochData._2.getVersion
+ val epochAndStatOpt = kafkaControllerZkUtils.getControllerEpoch
+ epochAndStatOpt.foreach { case (epoch, stat) =>
+ controllerContext.epoch = epoch
+ controllerContext.epochZkVersion = stat.getVersion
info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
}
}
@@ -985,33 +891,35 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
// stop watching the ISR changes for this partition
- zkUtils.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
- controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
+ val partitionReassignmentIsrChangeHandler =
+ controllerContext.partitionsBeingReassigned(topicAndPartition).partitionReassignmentIsrChangeHandler
+ kafkaControllerZkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
}
// read the current list of reassigned partitions from zookeeper
- val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
+ val partitionsBeingReassigned = kafkaControllerZkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
// remove this partition from that list
val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
// write the new list to zookeeper
- if (updatedPartitionsBeingReassigned.size < partitionsBeingReassigned.size)
- zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
+ val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
+ if (reassignment.isEmpty) {
+ info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path))
+ kafkaControllerZkUtils.deletePartitionReassignment()
+ } else {
+ val setDataResponse = kafkaControllerZkUtils.setPartitionReassignmentRaw(reassignment)
+ if (Code.get(setDataResponse.rc) == Code.OK) {
+ } else if (Code.get(setDataResponse.rc) == Code.NONODE) {
+ val createDataResponse = kafkaControllerZkUtils.createPartitionReassignment(reassignment)
+ if (Code.get(createDataResponse.rc) != Code.OK) {
+ throw new AdminOperationException(KeeperException.create(Code.get(createDataResponse.rc)))
+ }
+ } else {
+ throw new AdminOperationException(KeeperException.create(Code.get(setDataResponse.rc)))
+ }
+ }
// update the cache. NO-OP if the partition's reassignment was never started
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
}
- def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
- newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
- try {
- val zkPath = getTopicPath(topicAndPartition.topic)
- val jsonPartitionMap = zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => e._1.partition.toString -> e._2))
- zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
- debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
- } catch {
- case _: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
- case e2: Throwable => throw new KafkaException(e2.toString)
- }
- }
-
def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
isTriggeredByAutoRebalance : Boolean) {
for(partition <- partitionsToBeRemoved) {
@@ -1025,7 +933,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
}
if (!isTriggeredByAutoRebalance)
- zkUtils.deletePath(ZkUtils.PreferredReplicaLeaderElectionPath)
+ kafkaControllerZkUtils.deletePreferredReplicaElection()
}
/**
@@ -1046,93 +954,22 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
/**
- * Removes a given partition replica from the ISR; if it is not the current
- * leader and there are sufficient remaining replicas in ISR.
- *
- * @param topic topic
- * @param partition partition
- * @param replicaId replica Id
- * @return the new leaderAndIsr (with the replica removed if it was present),
- * or None if leaderAndIsr is empty.
- */
- def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = {
- val topicAndPartition = TopicAndPartition(topic, partition)
- debug("Removing replica %d from ISR %s for partition %s.".format(replicaId,
- controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.isr.mkString(","), topicAndPartition))
- var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
- var zkWriteCompleteOrUnnecessary = false
- while (!zkWriteCompleteOrUnnecessary) {
- // refresh leader and isr from zookeeper again
- val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
- zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
- case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
- val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
- val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
- if(controllerEpoch > epoch)
- throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
- "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
- "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
- if (leaderAndIsr.isr.contains(replicaId)) {
- // if the replica to be removed from the ISR is also the leader, set the new leader value to -1
- val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
- var newIsr = leaderAndIsr.isr.filter(b => b != replicaId)
-
- // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
- // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
- // eventually be restored as the leader.
- if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkUtils,
- ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
- info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
- newIsr = leaderAndIsr.isr
- }
-
- val newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
- // update the new leadership decision in zookeeper or retry
- val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
- newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
-
- val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
- finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion, epoch))
- controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
- if (updateSucceeded) {
- info(s"New leader and ISR for partition $topicAndPartition is $leaderWithNewVersion")
- }
- updateSucceeded
- } else {
- warn(s"Cannot remove replica $replicaId from ISR of partition $topicAndPartition since it is not in the ISR." +
- s" Leader = ${leaderAndIsr.leader} ; ISR = ${leaderAndIsr.isr}")
- finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
- controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
- true
- }
- case None =>
- warn("Cannot remove replica %d from ISR of %s - leaderAndIsr is empty.".format(replicaId, topicAndPartition))
- true
- }
- }
- finalLeaderIsrAndControllerEpoch
- }
-
- /**
* Does not change leader or isr, but just increments the leader epoch
*
- * @param topic topic
* @param partition partition
* @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
*/
- private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
- val topicAndPartition = TopicAndPartition(topic, partition)
- debug("Updating leader epoch for partition %s.".format(topicAndPartition))
+ private def updateLeaderEpoch(partition: TopicAndPartition): Option[LeaderIsrAndControllerEpoch] = {
+ debug("Updating leader epoch for partition %s.".format(partition))
var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
// refresh leader and isr from zookeeper again
- val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
- zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
- case Some(leaderIsrAndEpoch) =>
- val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
- val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
- if(controllerEpoch > epoch)
+ zkWriteCompleteOrUnnecessary = kafkaControllerZkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+ case Some(leaderIsrAndControllerEpoch) =>
+ val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+ val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+ if (controllerEpoch > epoch)
throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
"means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
@@ -1140,19 +977,19 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
// assigned replica list
val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
// update the new leadership decision in zookeeper or retry
- val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic,
- partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
-
- val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
- finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion, epoch))
- if (updateSucceeded) {
- info(s"Updated leader epoch for partition $topicAndPartition to ${leaderWithNewVersion.leaderEpoch}")
- }
- updateSucceeded
+ val (successfulUpdates, updatesToRetry, failedUpdates) =
+ kafkaControllerZkUtils.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
+ if (successfulUpdates.contains(partition)) {
+ val finalLeaderAndIsr = successfulUpdates(partition)
+ finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
+ info(s"Updated leader epoch for partition $partition to ${finalLeaderAndIsr.leaderEpoch}")
+ true
+ } else if (failedUpdates.contains(partition)) {
+ throw failedUpdates(partition)
+ } else false
case None =>
- throw new IllegalStateException(s"Cannot update leader epoch for partition $topicAndPartition as " +
+ throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " +
"leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist")
- true
}
}
finalLeaderIsrAndControllerEpoch
@@ -1194,654 +1031,565 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
}
}
- def getControllerID(): Int = {
- controllerContext.zkUtils.readDataMaybeNull(ZkUtils.ControllerPath)._1 match {
- case Some(controller) => KafkaController.parseControllerId(controller)
- case None => -1
- }
- }
-
- case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent {
+ case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
- def state = ControllerState.BrokerChange
+ def state = ControllerState.AutoLeaderBalance
override def process(): Unit = {
if (!isActive) return
- // Read the current broker list from ZK again instead of using currentBrokerList to increase
- // the odds of processing recent broker changes in a single ControllerEvent (KAFKA-5502).
- val curBrokers = zkUtils.getAllBrokersInCluster().toSet
- val curBrokerIds = curBrokers.map(_.id)
- val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
- val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
- val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
- val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
- controllerContext.liveBrokers = curBrokers
- val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
- val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
- val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
- info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
- .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
- newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
- deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
- if (newBrokerIds.nonEmpty)
- onBrokerStartup(newBrokerIdsSorted)
- if (deadBrokerIds.nonEmpty)
- onBrokerFailure(deadBrokerIdsSorted)
+ try {
+ checkAndTriggerAutoLeaderRebalance()
+ } finally {
+ scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
+ }
}
}
- case class TopicChange(topics: Set[String]) extends ControllerEvent {
+ case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
- def state = ControllerState.TopicChange
+ def state = ControllerState.ControlledShutdown
override def process(): Unit = {
- if (!isActive) return
- val newTopics = topics -- controllerContext.allTopics
- val deletedTopics = controllerContext.allTopics -- topics
- controllerContext.allTopics = topics
-
- val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
- controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
- !deletedTopics.contains(p._1.topic))
- controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
- info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
- deletedTopics, addedPartitionReplicaAssignment))
- if (newTopics.nonEmpty)
- onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
+ val controlledShutdownResult = Try { doControlledShutdown(id) }
+ controlledShutdownCallback(controlledShutdownResult)
}
- }
- case class PartitionModifications(topic: String) extends ControllerEvent {
+ private def doControlledShutdown(id: Int): Set[TopicAndPartition] = {
+ if (!isActive) {
+ throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
+ }
- def state = ControllerState.TopicChange
+ info("Shutting down broker " + id)
- override def process(): Unit = {
- if (!isActive) return
- val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
- val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
- !controllerContext.partitionReplicaAssignment.contains(p._1))
- if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
- error("Skipping adding partitions %s for topic %s since it is currently being deleted"
- .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
- else {
- if (partitionsToBeAdded.nonEmpty) {
- info(s"New partitions to be added $partitionsToBeAdded")
- controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
- onNewPartitionCreation(partitionsToBeAdded.keySet)
+ if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
+ throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+
+ controllerContext.shuttingDownBrokerIds.add(id)
+ debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
+ debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
+
+ val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter { partition =>
+ controllerContext.partitionReplicaAssignment(partition).size > 1 && controllerContext.partitionLeadershipInfo.contains(partition)
+ }
+ val (partitionsLeadByBroker, partitionsFollowedByBroker) = partitionsToActOn.partition { partition =>
+ controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
+ }
+ partitionStateMachine.handleStateChanges(partitionsLeadByBroker.toSeq, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy))
+ try {
+ brokerRequestBatch.newBatch()
+ partitionsFollowedByBroker.foreach { partition =>
+ brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition.topic,
+ partition.partition, deletePartition = false, null)
}
+ brokerRequestBatch.sendRequestsToBrokers(epoch)
+ } catch {
+ case e: IllegalStateException =>
+ handleIllegalState(e)
+ }
+ // If the broker is a follower, updates the isr in ZK and notifies the current leader
+ replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition => PartitionAndReplica(partition.topic, partition.partition, id)).toSeq, OfflineReplica)
+ def replicatedPartitionsBrokerLeads() = {
+ trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+ controllerContext.partitionLeadershipInfo.filter {
+ case (topicAndPartition, leaderIsrAndControllerEpoch) =>
+ leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+ }.keys
}
+ replicatedPartitionsBrokerLeads().toSet
}
}
- case class TopicDeletion(var topicsToBeDeleted: Set[String]) extends ControllerEvent {
+ case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
- def state = ControllerState.TopicDeletion
+ def state = ControllerState.LeaderAndIsrResponseReceived
override def process(): Unit = {
+ import JavaConverters._
if (!isActive) return
- debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
- val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
- if (nonExistentTopics.nonEmpty) {
- warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
- nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
+ val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
+
+ if (leaderAndIsrResponse.error != Errors.NONE) {
+ stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
+ return
}
- topicsToBeDeleted --= nonExistentTopics
- if (config.deleteTopicEnable) {
- if (topicsToBeDeleted.nonEmpty) {
- info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
- // mark topic ineligible for deletion if other state changes are in progress
- topicsToBeDeleted.foreach { topic =>
- val partitionReassignmentInProgress =
- controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
- if (partitionReassignmentInProgress)
- topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
- }
- // add topic to deletion list
- topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
- }
- } else {
- // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
- for (topic <- topicsToBeDeleted) {
- info(s"Removing ${getDeleteTopicPath(topic)} since delete topic is disabled")
- zkUtils.deletePath(getDeleteTopicPath(topic))
- }
+
+ val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR).keys.map(
+ new TopicAndPartition(_)).toSet
+ val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE).keys.map(
+ new TopicAndPartition(_)).toSet
+ val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
+ val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
+ controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
+ val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
+
+ if (newOfflineReplicas.nonEmpty) {
+ stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
+ onReplicasBecomeOffline(newOfflineReplicas.map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId)))
}
}
}
- case class PartitionReassignment(partitionReassignment: Map[TopicAndPartition, Seq[Int]]) extends ControllerEvent {
+ case class TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
- def state = ControllerState.PartitionReassignment
+ def state = ControllerState.TopicDeletion
override def process(): Unit = {
+ import JavaConverters._
if (!isActive) return
- val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
- partitionsToBeReassigned.foreach { partitionToBeReassigned =>
- if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
- error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
- .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
- removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
- } else {
- val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
- initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
- }
+ val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
+ debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
+ val responseMap = stopReplicaResponse.responses.asScala
+ val partitionsInError =
+ if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
+ else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
+ val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
+ // move all the failed replicas to ReplicaDeletionIneligible
+ topicDeletionManager.failReplicaDeletion(replicasInError)
+ if (replicasInError.size != responseMap.size) {
+ // some replicas could have been successfully deleted
+ val deletedReplicas = responseMap.keySet -- partitionsInError
+ topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
}
}
-
}
- case class PartitionReassignmentIsrChange(topicAndPartition: TopicAndPartition, reassignedReplicas: Set[Int]) extends ControllerEvent {
+ case object Startup extends ControllerEvent {
- def state = ControllerState.PartitionReassignment
+ def state = ControllerState.ControllerChange
override def process(): Unit = {
- if (!isActive) return
- // check if this partition is still being reassigned or not
- controllerContext.partitionsBeingReassigned.get(topicAndPartition).foreach { reassignedPartitionContext =>
- // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
- val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
- newLeaderAndIsrOpt match {
- case Some(leaderAndIsr) => // check if new replicas have joined ISR
- val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
- if(caughtUpReplicas == reassignedReplicas) {
- // resume the partition reassignment process
- info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
- .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
- "Resuming partition reassignment")
- onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
- }
- else {
- info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
- .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
- "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
- }
- case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
- .format(topicAndPartition, reassignedReplicas.mkString(",")))
- }
- }
+ kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+ elect()
}
- }
- case class IsrChangeNotification(sequenceNumbers: Seq[String]) extends ControllerEvent {
+ }
- def state = ControllerState.IsrChange
+ private def updateMetrics(): Unit = {
+ offlinePartitionCount =
+ if (!isActive) {
+ 0
+ } else {
+ controllerContext.partitionLeadershipInfo.count { case (tp, leadershipInfo) =>
+ !controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader) &&
+ !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
+ }
+ }
- override def process(): Unit = {
- // Read the current isr change notification znodes from ZK again instead of using sequenceNumbers
- // to increase the odds of processing recent isr changes in a single ControllerEvent
- // and to reduce the odds of trying to access znodes that have already been deleted (KAFKA-5879).
- val currentSequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.IsrChangeNotificationPath)
- if (!isActive) return
- try {
- val topicAndPartitions = currentSequenceNumbers.flatMap(getTopicAndPartition).toSet
- if (topicAndPartitions.nonEmpty) {
- updateLeaderAndIsrCache(topicAndPartitions)
- processUpdateNotifications(topicAndPartitions)
+ preferredReplicaImbalanceCount =
+ if (!isActive) {
+ 0
+ } else {
+ controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
+ val preferredReplica = replicas.head
+ val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
+ leadershipInfo.map(_.leaderAndIsr.leader != preferredReplica).getOrElse(false) &&
+ !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic)
}
- } finally {
- // delete the notifications
- currentSequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x))
}
- }
- private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
- val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
- debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
- sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
- }
+ globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size
- private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
- val changeZnode = ZkUtils.IsrChangeNotificationPath + "/" + child
- val (jsonOpt, _) = controllerContext.zkUtils.readDataMaybeNull(changeZnode)
- jsonOpt.map { json =>
- Json.parseFull(json) match {
- case Some(js) =>
- val isrChanges = js.asJsonObject
- isrChanges("partitions").asJsonArray.iterator.map(_.asJsonObject).map { tpJs =>
- val topic = tpJs("topic").to[String]
- val partition = tpJs("partition").to[Int]
- TopicAndPartition(topic, partition)
- }.toSet
- case None =>
- error(s"Invalid topic and partition JSON in ZK. ZK notification node: $changeZnode, JSON: $json")
- Set.empty[TopicAndPartition]
- }
- }.getOrElse(Set.empty[TopicAndPartition])
- }
+ globalPartitionCount = if (!isActive) 0 else controllerContext.partitionLeadershipInfo.size
+ }
+
+ // visible for testing
+ private[controller] def handleIllegalState(e: IllegalStateException): Nothing = {
+ // Resign if the controller is in an illegal state
+ error("Forcing the controller to resign")
+ brokerRequestBatch.clear()
+ triggerControllerMove()
+ throw e
+ }
+
+ private def triggerControllerMove(): Unit = {
+ onControllerResignation()
+ activeControllerId = -1
+ kafkaControllerZkUtils.deleteController()
+ }
+ def expire(): Unit = {
+ val expireEvent = Expire()
+ eventManager.clearAndPut(expireEvent)
+ expireEvent.waitUntilProcessed()
}
- case class LogDirEventNotification(sequenceNumbers: Seq[String]) extends ControllerEvent {
+ def newSession(): Unit = {
+ eventManager.put(Reelect)
+ }
- def state = ControllerState.LogDirChange
+ def elect(): Unit = {
+ val timestamp = time.milliseconds
+ activeControllerId = kafkaControllerZkUtils.getControllerId.getOrElse(-1)
+ /*
+ * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
+ * it's possible that the controller has already been elected when we get here. This check will prevent the following
+ * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
+ */
+ if (activeControllerId != -1) {
+ debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId))
+ return
+ }
- override def process(): Unit = {
- val zkUtils = controllerContext.zkUtils
- try {
- val brokerIds = sequenceNumbers.flatMap(LogDirUtils.getBrokerIdFromLogDirEvent(zkUtils, _))
- onBrokerLogDirFailure(brokerIds)
- } finally {
- // delete processed children
- sequenceNumbers.map(x => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x))
- }
+ try {
+ kafkaControllerZkUtils.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
+ info(config.brokerId + " successfully elected as the controller")
+ activeControllerId = config.brokerId
+ onControllerFailover()
+ } catch {
+ case _: NodeExistsException =>
+ // If someone else has written the path, then
+ activeControllerId = kafkaControllerZkUtils.getControllerId.getOrElse(-1)
+
+ if (activeControllerId != -1)
+ debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
+ else
+ warn("A controller has been elected but just resigned, this will result in another round of election")
+
+ case e2: Throwable =>
+ error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2)
+ triggerControllerMove()
}
}
- case class PreferredReplicaLeaderElection(partitions: Set[TopicAndPartition]) extends ControllerEvent {
-
- def state = ControllerState.ManualLeaderBalance
+ case object BrokerChange extends ControllerEvent {
+ override def state: ControllerState = ControllerState.BrokerChange
override def process(): Unit = {
if (!isActive) return
- val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
- if (partitionsForTopicsToBeDeleted.nonEmpty) {
- error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
- .format(partitionsForTopicsToBeDeleted))
- }
- onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
+ val curBrokers = kafkaControllerZkUtils.getAllBrokersInCluster.toSet
+ val curBrokerIds = curBrokers.map(_.id)
+ val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
+ val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
+ val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
+ val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
+ controllerContext.liveBrokers = curBrokers
+ val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
+ val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
+ val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
+ info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
+ .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
+ newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
+ deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
+ if (newBrokerIds.nonEmpty)
+ onBrokerStartup(newBrokerIdsSorted)
+ if (deadBrokerIds.nonEmpty)
+ onBrokerFailure(deadBrokerIdsSorted)
}
-
}
- case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
+ case object TopicChange extends ControllerEvent {
+ override def state: ControllerState = ControllerState.TopicChange
- def state = ControllerState.AutoLeaderBalance
+ override def process(): Unit = {
+ if (!isActive) return
+ val topics = kafkaControllerZkUtils.getAllTopicsInCluster.toSet
+ val newTopics = topics -- controllerContext.allTopics
+ val deletedTopics = controllerContext.allTopics -- topics
+ controllerContext.allTopics = topics
+
+ registerPartitionModificationsHandlers(newTopics.toSeq)
+ val addedPartitionReplicaAssignment = kafkaControllerZkUtils.getReplicaAssignmentForTopics(newTopics)
+ controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
+ !deletedTopics.contains(p._1.topic))
+ controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
+ info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
+ deletedTopics, addedPartitionReplicaAssignment))
+ if (addedPartitionReplicaAssignment.nonEmpty)
+ onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
+ }
+ }
+
+ case object LogDirEventNotification extends ControllerEvent {
+ override def state: ControllerState = ControllerState.LogDirChange
override def process(): Unit = {
if (!isActive) return
+ val sequenceNumbers = kafkaControllerZkUtils.getAllLogDirEventNotifications
try {
- checkAndTriggerAutoLeaderRebalance()
+ val brokerIds = kafkaControllerZkUtils.getBrokerIdsFromLogDirEvents(sequenceNumbers)
+ onBrokerLogDirFailure(brokerIds)
} finally {
- scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
+ // delete processed children
+ kafkaControllerZkUtils.deleteLogDirEventNotifications(sequenceNumbers)
}
}
}
- case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
-
- def state = ControllerState.ControlledShutdown
+ case class PartitionModifications(topic: String) extends ControllerEvent {
+ override def state: ControllerState = ControllerState.TopicChange
override def process(): Unit = {
- val controlledShutdownResult = Try { doControlledShutdown(id) }
- controlledShutdownCallback(controlledShutdownResult)
- }
-
- private def doControlledShutdown(id: Int): Set[TopicAndPartition] = {
- if (!isActive) {
- throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
+ if (!isActive) return
+ val partitionReplicaAssignment = kafkaControllerZkUtils.getReplicaAssignmentForTopics(immutable.Set(topic))
+ val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
+ !controllerContext.partitionReplicaAssignment.contains(p._1))
+ if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
+ error("Skipping adding partitions %s for topic %s since it is currently being deleted"
+ .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+ else {
+ if (partitionsToBeAdded.nonEmpty) {
+ info(s"New partitions to be added $partitionsToBeAdded")
+ controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded
+ onNewPartitionCreation(partitionsToBeAdded.keySet)
+ }
}
+ }
+ }
- info("Shutting down broker " + id)
-
- if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
- throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+ case object TopicDeletion extends ControllerEvent {
+ override def state: ControllerState = ControllerState.TopicDeletion
- controllerContext.shuttingDownBrokerIds.add(id)
- debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
- debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
-
- val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
- controllerContext.partitionsOnBroker(id)
- .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
-
- allPartitionsAndReplicationFactorOnBroker.foreach { case (topicAndPartition, replicationFactor) =>
- controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
- if (replicationFactor > 1) {
- if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
- // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
- // notifies all affected brokers
- partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
- controlledShutdownPartitionLeaderSelector)
- } else {
- // Stop the replica first. The state change below initiates ZK changes which should take some time
- // before which the stop replica request should be completed (in most cases)
- try {
- brokerRequestBatch.newBatch()
- brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
- topicAndPartition.partition, deletePartition = false)
- brokerRequestBatch.sendRequestsToBrokers(epoch)
- } catch {
- case e: IllegalStateException =>
- handleIllegalState(e)
- }
- // If the broker is a follower, updates the isr in ZK and notifies the current leader
- replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
- topicAndPartition.partition, id)), OfflineReplica)
- }
+ override def process(): Unit = {
+ if (!isActive) return
+ var topicsToBeDeleted = kafkaControllerZkUtils.getTopicDeletions.toSet
+ debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
+ val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
+ if (nonExistentTopics.nonEmpty) {
+ warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
+ kafkaControllerZkUtils.deleteTopicDeletions(nonExistentTopics.toSeq)
+ }
+ topicsToBeDeleted --= nonExistentTopics
+ if (config.deleteTopicEnable) {
+ if (topicsToBeDeleted.nonEmpty) {
+ info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
+ // mark topic ineligible for deletion if other state changes are in progress
+ topicsToBeDeleted.foreach { topic =>
+ val partitionReassignmentInProgress =
+ controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
+ if (partitionReassignmentInProgress)
+ topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
}
+ // add topic to deletion list
+ topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
+ } else {
+ // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
+ info(s"Removing $topicsToBeDeleted since delete topic is disabled")
+ kafkaControllerZkUtils.deleteTopicDeletions(topicsToBeDeleted.toSeq)
}
- def replicatedPartitionsBrokerLeads() = {
- trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
- controllerContext.partitionLeadershipInfo.filter {
- case (topicAndPartition, leaderIsrAndControllerEpoch) =>
- leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
- }.keys
- }
- replicatedPartitionsBrokerLeads().toSet
}
}
- case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
-
- def state = ControllerState.LeaderAndIsrResponseReceived
+ case object PartitionReassignment extends ControllerEvent {
+ override def state: ControllerState = ControllerState.PartitionReassignment
override def process(): Unit = {
- import JavaConverters._
- val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
-
- if (leaderAndIsrResponse.error != Errors.NONE) {
- stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
- return
+ if (!isActive) return
+ kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
+ val partitionReassignment = kafkaControllerZkUtils.getPartitionReassignment
+ val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
+ partitionsToBeReassigned.foreach { partitionToBeReassigned =>
+ if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
+ error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
+ .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
+ removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
+ } else {
+ val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
+ initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+ }
}
+ }
+ }
- val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR).keys.map(
- new TopicAndPartition(_)).toSet
- val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE).keys.map(
- new TopicAndPartition(_)).toSet
- val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
- val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
- controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
- val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
+ case class PartitionReassignmentIsrChange(partition: TopicAndPartition) extends ControllerEvent {
+ override def state: ControllerState = ControllerState.PartitionReassignment
- if (newOfflineReplicas.nonEmpty) {
- stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
- onReplicasBecomeOffline(newOfflineReplicas.map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId)))
+ override def process(): Unit = {
+ if (!isActive) return
+ // check if this partition is still being reassigned or not
+ controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
+ val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
+ kafkaControllerZkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+ case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
+ val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+ val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
+ if(caughtUpReplicas == reassignedReplicas) {
+ // resume the partition reassignment process
+ info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+ .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
+ "Resuming partition reassignment")
+ onPartitionReassignment(partition, reassignedPartitionContext)
+ }
+ else {
+ info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+ .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
+ "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
+ }
+ case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
+ .format(partit
<TRUNCATED>