You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/09 00:57:20 UTC

kafka git commit: KAFKA-6146; minimize the number of triggers enqueuing PreferredReplicaLeaderElection events

Repository: kafka
Updated Branches:
  refs/heads/trunk ee1aaa091 -> fef80c863


KAFKA-6146; minimize the number of triggers enqueuing PreferredReplicaLeaderElection events

We currently enqueue a PreferredReplicaLeaderElection controller event in PreferredReplicaElectionHandler's handleCreation, handleDeletion, and handleDataChange. We can just enqueue the event upon znode creation and after preferred replica leader election completes. The processing of this latter enqueue will register the exist watch on PreferredReplicaElectionZNode and perform any pending preferred replica leader election that may have occurred between completion and registration.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #4189 from onurkaraman/KAFKA-6146


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fef80c86
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fef80c86
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fef80c86

Branch: refs/heads/trunk
Commit: fef80c8636d03a25a3b17f31d9cf2ab9b98f385f
Parents: ee1aaa0
Author: Onur Karaman <ok...@linkedin.com>
Authored: Thu Nov 9 00:57:15 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 9 00:57:15 2017 +0000

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 25 +++++++-----
 .../controller/ControllerIntegrationTest.scala  | 43 ++++++++++++++------
 2 files changed, 45 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fef80c86/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index aa5cf1f..845ac63 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -837,8 +837,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
       }
     }
-    if (!isTriggeredByAutoRebalance)
+    if (!isTriggeredByAutoRebalance) {
       zkClient.deletePreferredReplicaElection()
+      // Ensure we detect future preferred replica leader elections
+      eventManager.put(PreferredReplicaLeaderElection)
+    }
   }
 
   /**
@@ -1361,14 +1364,18 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
     override def process(): Unit = {
       if (!isActive) return
-      zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
-      val partitions = zkClient.getPreferredReplicaElection
-      val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
-      if (partitionsForTopicsToBeDeleted.nonEmpty) {
-        error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
-          .format(partitionsForTopicsToBeDeleted))
+
+      // We need to register the watcher if the path doesn't exist in order to detect future preferred replica
+      // leader elections and we get the `path exists` check for free
+      if (zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
+        val partitions = zkClient.getPreferredReplicaElection
+        val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
+        if (partitionsForTopicsToBeDeleted.nonEmpty) {
+          error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
+            .format(partitionsForTopicsToBeDeleted))
+        }
+        onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
       }
-      onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
     }
   }
 
@@ -1480,8 +1487,6 @@ class PreferredReplicaElectionHandler(controller: KafkaController, eventManager:
   override val path: String = PreferredReplicaElectionZNode.path
 
   override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection)
-  override def handleDeletion(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection)
-  override def handleDataChange(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection)
 }
 
 class ControllerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fef80c86/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 99cacb3..d917fe1 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -200,21 +200,23 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   def testPreferredReplicaLeaderElection(): Unit = {
     servers = makeServers(2)
     val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
-    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
     val tp = TopicAndPartition("t", 0)
-    val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
+    val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
     TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
-    servers(otherBrokerId).shutdown()
-    servers(otherBrokerId).awaitShutdown()
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
-      "failed to get expected partition state upon broker shutdown")
-    servers(otherBrokerId).startup()
-    TestUtils.waitUntilTrue(() => zkUtils.getInSyncReplicasForPartition(tp.topic, tp.partition).toSet == assignment(tp.partition).toSet, "restarted broker failed to join in-sync replicas")
-    zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
-    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
-      "failed to remove preferred replica leader election path after completion")
-    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2,
-      "failed to get expected partition state upon broker startup")
+    preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch)
+  }
+
+  @Test
+  def testBackToBackPreferredReplicaLeaderElections(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(otherBroker.config.brokerId, controllerId))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch)
+    preferredReplicaLeaderElection(controllerId, otherBroker, tp, assignment(tp.partition).toSet, LeaderAndIsr.initialLeaderEpoch + 2)
   }
 
   @Test
@@ -291,6 +293,21 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     }, "failed to get expected partition state after entire isr went offline")
   }
 
+  private def preferredReplicaLeaderElection(controllerId: Int, otherBroker: KafkaServer, tp: TopicAndPartition,
+                                             replicas: Set[Int], leaderEpoch: Int): Unit = {
+    otherBroker.shutdown()
+    otherBroker.awaitShutdown()
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, leaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+    otherBroker.startup()
+    TestUtils.waitUntilTrue(() => zkUtils.getInSyncReplicasForPartition(tp.topic, tp.partition).toSet == replicas, "restarted broker failed to join in-sync replicas")
+    zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
+    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
+      "failed to remove preferred replica leader election path after completion")
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBroker.config.brokerId, leaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+  }
+
   private def waitUntilControllerEpoch(epoch: Int, message: String): Unit = {
     TestUtils.waitUntilTrue(() => zkUtils.readDataMaybeNull(ZkUtils.ControllerEpochPath)._1.map(_.toInt) == Some(epoch), message)
   }