You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/18 16:15:07 UTC

[5/5] kafka git commit: KAFKA-5642; Use async ZookeeperClient in Controller

KAFKA-5642; Use async ZookeeperClient in Controller

Kafka today uses ZkClient, a wrapper client around the raw Zookeeper client. This library only exposes synchronous apis to the user. Synchronous apis mean we must wait an entire round trip before doing the next operation.

This becomes problematic with partition-heavy clusters, as we find the controller spending a significant amount of time just sending many sequential reads and writes to zookeeper at the per-partition granularity. This especially becomes an issue during:
- controller failover, where the newly elected controller effectively reads all zookeeper state.
- broker failures and controlled shutdown. The controller tries to elect a new leader for partitions previously led by the broker. The controller also removes the broker from isr on partitions for which the broker was a follower. These all incur partition-granular reads and writes to zookeeper.

As a first step in addressing these issues, we built a low-level wrapper client called ZookeeperClient in KAFKA-5501 that encourages pipelined, asynchronous apis.

This patch converts the controller to use the async ZookeeperClient to improve controller failover, broker failure handling, and controlled shutdown times.

Some notable changes made in this patch:
- All ControllerEvents now defer access to zookeeper at processing time instead of enqueue time as was intended with the single-threaded event queue model patch from KAFKA-5028. This results in a fresh view of the zookeeper state by the time we process the event. This reverts the hacks from KAFKA-5502 and KAFKA-5879.
- We refactored PartitionStateMachine and ReplicaStateMachine to process multiple partitions and replicas in batch rather than one-at-a-time so that we can send a batch of requests over to ZookeeperClient to pipeline.
- We've decouple ZookeeperClient handler registration from watcher registration. Previously, these two were coupled, which meant handler registrations actually sent out a request to the zookeeper ensemble to do the actual watcher registration. In KafkaController.onControllerFailover, we register partition modification handlers (thereby registering watchers) and additionally lookup the partition assignments for every topic in the cluster. We can shave a bit of time off failover if we merge these two operations. We can do this by decoupling ZookeeperClient handler registration from watcher registration. This means ZookeeperClient's registration apis have been changed so that they are purely in-memory operations, and they only take effect when the client sends ExistsRequest, GetDataRequest, or GetChildrenRequest.
- We've simplified the logic for updating LeaderAndIsr such that if we get a BADVERSION error code, the controller will now just retry in the next round by reading the new state and trying the update again. This simplifies logic when updating the partition leader epoch, removing replicas from isr, and electing leaders for partitions.
- We've implemented KAFKA-5083: always leave the last surviving member of the ISR in ZK. This means that if people re-disabled unclean leader election, we can still try to elect the leader from the last in-sync replica.
- ZookeeperClient's handlers have been changed so that their methods default to no-ops for convenience.
- All znode paths and definitions for znode encoding and decoding have been consolidated as static methods in ZkData.scala.
- The partition leader election algorithms have been refactored as pure functions so that they can be easily unit tested.
- PartitionStateMachine and ReplicaStateMachine now have unit tests.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #3765 from onurkaraman/KAFKA-5642


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b71ee043
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b71ee043
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b71ee043

Branch: refs/heads/trunk
Commit: b71ee043f8959ee0d4699071ba8fc1e2c5675842
Parents: 68f324f
Author: Onur Karaman <ok...@linkedin.com>
Authored: Wed Oct 18 09:14:59 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 18 09:14:59 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |    2 +-
 .../controller/ControllerChannelManager.scala   |    6 +-
 .../controller/ControllerEventManager.scala     |   17 +-
 .../kafka/controller/KafkaController.scala      | 1406 +++++++-----------
 .../controller/KafkaControllerZkUtils.scala     |  684 +++++++++
 .../controller/PartitionLeaderSelector.scala    |  205 ---
 .../controller/PartitionStateMachine.scala      |  582 +++++---
 .../kafka/controller/ReplicaStateMachine.scala  |  433 +++---
 .../kafka/controller/TopicDeletionManager.scala |   43 +-
 .../main/scala/kafka/controller/ZkData.scala    |  248 +++
 .../kafka/controller/ZookeeperClient.scala      |  127 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   22 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   20 +-
 .../controller/ControllerIntegrationTest.scala  |    2 +-
 .../PartitionLeaderElectionAlgorithmsTest.scala |  176 +++
 .../controller/PartitionStateMachineTest.scala  |  311 ++++
 .../controller/ReplicaStateMachineTest.scala    |  371 +++++
 .../kafka/controller/ZookeeperClientTest.scala  |   42 +-
 .../ControlledShutdownLeaderSelectorTest.scala  |   73 -
 .../unit/kafka/server/LeaderElectionTest.scala  |    2 +-
 20 files changed, 3163 insertions(+), 1609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index 029d476..7a83cf3 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -38,7 +38,7 @@ case class LeaderAndIsr(leader: Int,
 
   def newLeader(leader: Int) = newLeaderAndIsr(leader, isr)
 
-  def newLeaderAndIsr(leader: Int, isr: List[Int]) = LeaderAndIsr(leader, leaderEpoch + 1, isr, zkVersion + 1)
+  def newLeaderAndIsr(leader: Int, isr: List[Int]) = LeaderAndIsr(leader, leaderEpoch + 1, isr, zkVersion)
 
   def newEpochAndZkVersion = newLeaderAndIsr(leader, isr)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 5ac85cc..9fef617 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -314,7 +314,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                                       replicas: Seq[Int], isNew: Boolean = false) {
+                                       replicas: Seq[Int], isNew: Boolean) {
     val topicPartition = new TopicPartition(topic, partition)
 
     brokerIds.filter(_ >= 0).foreach { brokerId =>
@@ -334,7 +334,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
   }
 
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
-                                      callback: (AbstractResponse, Int) => Unit = null) {
+                                      callback: (AbstractResponse, Int) => Unit) {
     brokerIds.filter(b => b >= 0).foreach { brokerId =>
       stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
       val v = stopReplicaRequestMap(brokerId)
@@ -349,7 +349,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
 
   /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
-                                         partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+                                         partitions: collection.Set[TopicAndPartition]) {
 
     def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) {
       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/ControllerEventManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index f7ed54e..396a39d 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -18,12 +18,14 @@
 package kafka.controller
 
 import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection._
+import java.util.concurrent.locks.ReentrantLock
 
 import kafka.metrics.KafkaTimer
+import kafka.utils.CoreUtils.inLock
 import kafka.utils.ShutdownableThread
 
+import scala.collection._
+
 object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
 }
@@ -31,7 +33,7 @@ class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer
                              eventProcessedListener: ControllerEvent => Unit) {
 
   @volatile private var _state: ControllerState = ControllerState.Idle
-
+  private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[ControllerEvent]
   private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
 
@@ -41,7 +43,14 @@ class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer
 
   def close(): Unit = thread.shutdown()
 
-  def put(event: ControllerEvent): Unit = queue.put(event)
+  def put(event: ControllerEvent): Unit = inLock(putLock) {
+    queue.put(event)
+  }
+
+  def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) {
+    queue.clear()
+    queue.put(event)
+  }
 
   class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
     override def doWork(): Unit = {