You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ok...@apache.org on 2017/11/09 21:52:47 UTC

kafka git commit: MINOR: make controller helper methods private

Repository: kafka
Updated Branches:
  refs/heads/trunk 3e86161b8 -> 564d7b365


MINOR: make controller helper methods private

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Viktor Somogyi <vi...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #4198 from onurkaraman/make-controller-helper-methods-private


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/564d7b36
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/564d7b36
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/564d7b36

Branch: refs/heads/trunk
Commit: 564d7b365d102ffbeb24e2421ab6319db311e6b5
Parents: 3e86161
Author: Onur Karaman <ok...@linkedin.com>
Authored: Thu Nov 9 13:51:32 2017 -0800
Committer: Onur Karaman <ok...@linkedin.com>
Committed: Thu Nov 9 13:51:32 2017 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 130 +++++++++----------
 .../src/main/scala/kafka/server/KafkaApis.scala |   2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   8 +-
 3 files changed, 65 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/564d7b36/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 845ac63..f0ca9e7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -122,29 +122,60 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   )
 
+  /**
+   * Returns true if this broker is the current controller.
+   */
+  def isActive: Boolean = activeControllerId == config.brokerId
+
   def epoch: Int = controllerContext.epoch
 
-  def state: ControllerState = eventManager.state
+  /**
+   * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
+   * is the controller. It merely registers the session expiration listener and starts the controller leader
+   * elector
+   */
+  def startup() = {
+    zkClient.registerStateChangeHandler(new StateChangeHandler {
+      override val name: String = StateChangeHandlers.ControllerHandler
+      override def onReconnectionTimeout(): Unit = error("Reconnection timeout.")
+      override def afterInitializingSession(): Unit = {
+        eventManager.put(Reelect)
+      }
+      override def beforeInitializingSession(): Unit = {
+        val expireEvent = new Expire
+        eventManager.clearAndPut(expireEvent)
+        expireEvent.waitUntilProcessed()
+      }
+    })
+    eventManager.put(Startup)
+    eventManager.start()
+  }
 
-  def clientId: String = {
-    val controllerListener = config.listeners.find(_.listenerName == config.interBrokerListenerName).getOrElse(
-      throw new IllegalArgumentException(s"No listener with name ${config.interBrokerListenerName} is configured."))
-    "id_%d-host_%s-port_%d".format(config.brokerId, controllerListener.host, controllerListener.port)
+  /**
+   * Invoked when the controller module of a Kafka server is shutting down. If the broker was the current controller,
+   * it shuts down the partition and replica state machines. If not, those are a no-op. In addition to that, it also
+   * shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
+   */
+  def shutdown() = {
+    eventManager.close()
+    onControllerResignation()
   }
 
   /**
-   * On clean shutdown, the controller first determines the partitions that the
+   * On controlled shutdown shutdown, the controller first determines the partitions that the
    * shutting down broker leads, and moves leadership of those partitions to another broker
    * that is in that partition's ISR.
    *
    * @param id Id of the broker to shutdown.
    * @return The number of partitions that the broker still leads.
    */
-  def shutdownBroker(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit = {
+  def controlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit = {
     val controlledShutdownEvent = ControlledShutdown(id, controlledShutdownCallback)
     eventManager.put(controlledShutdownEvent)
   }
 
+  private def state: ControllerState = eventManager.state
+
   /**
    * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
    * It does the following things on the become-controller state change -
@@ -158,7 +189,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
    * This ensures another controller election will be triggered and there will always be an actively serving controller
    */
-  def onControllerFailover() {
+  private def onControllerFailover() {
     info("Reading controller epoch from ZooKeeper")
     readControllerEpochFromZooKeeper()
     info("Incrementing controller epoch in ZooKeeper")
@@ -214,7 +245,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
    * required to clean up internal controller data structures
    */
-  def onControllerResignation() {
+  private def onControllerResignation() {
     debug("Resigning")
     // de-register listeners
     zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
@@ -248,17 +279,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     info("Resigned")
   }
 
-  /**
-   * Returns true if this broker is the current controller.
-   */
-  def isActive: Boolean = activeControllerId == config.brokerId
-
   /*
    * This callback is invoked by the controller's LogDirEventNotificationListener with the list of broker ids who
    * have experienced new log directory failures. In response the controller should send LeaderAndIsrRequest
    * to all these brokers to query the state of their replicas
    */
-  def onBrokerLogDirFailure(brokerIds: Seq[Int]) {
+  private def onBrokerLogDirFailure(brokerIds: Seq[Int]) {
     // send LeaderAndIsrRequest for all replicas on those brokers to see if they are still online.
     val replicasOnBrokers = controllerContext.replicasOnBrokers(brokerIds.toSet)
     replicaStateMachine.handleStateChanges(replicasOnBrokers.toSeq, OnlineReplica)
@@ -278,7 +304,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * 2. Even if we do refresh the cache, there is no guarantee that by the time the leader and ISR request reaches
    *    every broker that it is still valid.  Brokers check the leader epoch to determine validity of the request.
    */
-  def onBrokerStartup(newBrokers: Seq[Int]) {
+  private def onBrokerStartup(newBrokers: Seq[Int]) {
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
     newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
     val newBrokersSet = newBrokers.toSet
@@ -314,7 +340,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * This callback is invoked by the replica state machine's broker change listener with the list of failed brokers
    * as input. It will call onReplicaBecomeOffline(...) with the list of replicas on those failed brokers as input.
    */
-  def onBrokerFailure(deadBrokers: Seq[Int]) {
+  private def onBrokerFailure(deadBrokers: Seq[Int]) {
     info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
     deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
     val deadBrokersThatWereShuttingDown =
@@ -335,7 +361,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     * the partition state machine will refresh our cache for us when performing leader election for all new/offline
     * partitions coming online.
     */
-  def onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit = {
+  private def onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit = {
     val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) =
       newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
 
@@ -371,7 +397,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * 1. Move the newly created partitions to the NewPartition state
    * 2. Move the newly created partitions from NewPartition->OnlinePartition state
    */
-  def onNewPartitionCreation(newPartitions: Set[TopicPartition]) {
+  private def onNewPartitionCreation(newPartitions: Set[TopicPartition]) {
     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
     partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
@@ -420,7 +446,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
    * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
    * This way, if the controller crashes before that step, we can still recover.
    */
-  def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
+  private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
     if (!areReplicasInIsr(topicPartition, reassignedReplicas)) {
       info(s"New replicas ${reassignedReplicas.mkString(",")} for partition $topicPartition being reassigned not yet " +
@@ -471,7 +497,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
   }
 
-  def initiateReassignReplicasForTopicPartition(topicPartition: TopicPartition,
+  private def initiateReassignReplicasForTopicPartition(topicPartition: TopicPartition,
                                                 reassignedPartitionContext: ReassignedPartitionsContext) {
     val newReplicas = reassignedPartitionContext.newReplicas
     val topic = topicPartition.topic
@@ -501,7 +527,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
+  private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
@@ -512,38 +538,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  /**
-   * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
-   * is the controller. It merely registers the session expiration listener and starts the controller leader
-   * elector
-   */
-  def startup() = {
-    zkClient.registerStateChangeHandler(new StateChangeHandler {
-      override val name: String = StateChangeHandlers.ControllerHandler
-      override def onReconnectionTimeout(): Unit = error("Reconnection timeout.")
-      override def afterInitializingSession(): Unit = newSession()
-      override def beforeInitializingSession(): Unit = expire()
-    })
-    eventManager.put(Startup)
-    eventManager.start()
-  }
-
-  /**
-   * Invoked when the controller module of a Kafka server is shutting down. If the broker was the current controller,
-   * it shuts down the partition and replica state machines. If not, those are a no-op. In addition to that, it also
-   * shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
-   */
-  def shutdown() = {
-    eventManager.close()
-    onControllerResignation()
-  }
-
-  def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
-                  callback: AbstractResponse => Unit = null) = {
-    controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, request, callback)
-  }
-
-  def incrementControllerEpoch(): Unit = {
+  private def incrementControllerEpoch(): Unit = {
     val newControllerEpoch = controllerContext.epoch + 1
     val setDataResponse = zkClient.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
     setDataResponse.resultCode match {
@@ -667,7 +662,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     controllerContext.controllerChannelManager.startup()
   }
 
-  def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
+  private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
     val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
     leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
       controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -777,7 +772,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
   }
 
-  def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
+  private[controller] def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
     topics.foreach { topic =>
       partitionModificationsHandlers.remove(topic)
         .foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
@@ -800,7 +795,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  def removePartitionFromReassignedPartitions(topicPartition: TopicPartition) {
+  private def removePartitionFromReassignedPartitions(topicPartition: TopicPartition) {
     controllerContext.partitionsBeingReassigned.get(topicPartition).foreach { reassignContext =>
       // stop watching the ISR changes for this partition
       zkClient.unregisterZNodeChangeHandler(reassignContext.reassignIsrChangeHandler.path)
@@ -825,7 +820,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     controllerContext.partitionsBeingReassigned.remove(topicPartition)
   }
 
-  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition],
+  private def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition],
                                                    isTriggeredByAutoRebalance : Boolean) {
     for (partition <- partitionsToBeRemoved) {
       // check the status
@@ -844,13 +839,18 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
+  private[controller] def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
+                                      callback: AbstractResponse => Unit = null) = {
+    controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, request, callback)
+  }
+
   /**
    * Send the leader information for selected partitions to selected brokers so that they can correctly respond to
    * metadata requests
    *
    * @param brokers The brokers that the update metadata request should be sent to
    */
-  def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition] = Set.empty[TopicPartition]) {
+  private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition] = Set.empty[TopicPartition]) {
     try {
       brokerRequestBatch.newBatch()
       brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
@@ -1118,17 +1118,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     zkClient.deleteController()
   }
 
-  def expire(): Unit = {
-    val expireEvent = new Expire
-    eventManager.clearAndPut(expireEvent)
-    expireEvent.waitUntilProcessed()
-  }
-
-  def newSession(): Unit = {
-    eventManager.put(Reelect)
-  }
-
-  def elect(): Unit = {
+  private def elect(): Unit = {
     val timestamp = time.milliseconds
     activeControllerId = zkClient.getControllerId.getOrElse(-1)
     /*

http://git-wip-us.apache.org/repos/asf/kafka/blob/564d7b36/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b9a1971..ced3f0b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -245,7 +245,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       sendResponseExemptThrottle(request, response)
     }
-    controller.shutdownBroker(controlledShutdownRequest.brokerId, controlledShutdownCallback)
+    controller.controlledShutdown(controlledShutdownRequest.brokerId, controlledShutdownCallback)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/564d7b36/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 2f520a3..0c9bd6e 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -357,7 +357,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
   }
 
   @Test
-  def testShutdownBroker() {
+  def testControlledShutdown() {
     val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
     val topic = "test"
     val partition = 1
@@ -371,7 +371,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
     val resultQueue = new LinkedBlockingQueue[Try[Set[TopicPartition]]]()
     val controlledShutdownCallback = (controlledShutdownResult: Try[Set[TopicPartition]]) => resultQueue.put(controlledShutdownResult)
-    controller.shutdownBroker(2, controlledShutdownCallback)
+    controller.controlledShutdown(2, controlledShutdownCallback)
     var partitionsRemaining = resultQueue.take().get
     var activeServers = servers.filter(s => s.config.brokerId != 2)
     // wait for the update metadata request to trickle to the brokers
@@ -385,7 +385,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     assertEquals(2, partitionStateInfo.basePartitionState.isr.size)
     assertEquals(List(0,1), partitionStateInfo.basePartitionState.isr.asScala)
 
-    controller.shutdownBroker(1, controlledShutdownCallback)
+    controller.controlledShutdown(1, controlledShutdownCallback)
     partitionsRemaining = resultQueue.take().get
     assertEquals(0, partitionsRemaining.size)
     activeServers = servers.filter(s => s.config.brokerId == 0)
@@ -394,7 +394,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     assertEquals(0, leaderAfterShutdown)
 
     assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
-    controller.shutdownBroker(0, controlledShutdownCallback)
+    controller.controlledShutdown(0, controlledShutdownCallback)
     partitionsRemaining = resultQueue.take().get
     assertEquals(1, partitionsRemaining.size)
     // leader doesn't change since all the replicas are shut down