You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/12 16:53:19 UTC

[GitHub] [kafka] mumrah commented on a diff in pull request #12973: MINOR: Change KRaft controllership claiming algorithm

mumrah commented on code in PR #12973:
URL: https://github.com/apache/kafka/pull/12973#discussion_r1046093236


##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -166,60 +166,74 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
    * the migration.
    *
    * To ensure that the KRaft controller epoch exceeds the current ZK controller epoch, this registration algorithm
-   * uses a conditional update on the /controller_epoch znode. If a new ZK controller is elected during this method,
-   * the conditional update on /controller_epoch fails which causes the whole multi-op transaction to fail.
+   * uses a conditional update on the /controller and /controller_epoch znodes.
+   *
+   * If a new controller is registered concurrently with this registration, one of the two will fail the CAS
+   * operation on /controller_epoch. For KRaft, we have an extra guard against the registered KRaft epoch going
+   * backwards. If a KRaft controller had previously registered, an additional CAS operation is done on the /controller
+   * ZNode to ensure that the KRaft epoch being registered is newer.
    *
    * @param kraftControllerId ID of the KRaft controller node
    * @param kraftControllerEpoch Epoch of the KRaft controller node
-   * @return An optional of the new zkVersion of /controller_epoch. None if we could not register the KRaft controller.
+   * @return An optional of the written epoch and new zkVersion of /controller_epoch. None if we could not register the KRaft controller.
    */
-  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[Int] = {
+  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[(Int, Int)] = {
     val timestamp = time.milliseconds()
     val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, e._2.getVersion))
-    val controllerOpt = getControllerId
-    val controllerEpochToStore = kraftControllerEpoch + 10000000 // TODO Remove this after KAFKA-14436
+    val controllerOpt = getControllerRegistration
+
+    // If we have a KRaft epoch registered in /controller, and it is not _older_ than the requested epoch, throw an error.
+    controllerOpt.flatMap(_.kraftEpoch).foreach { kraftEpochInZk =>
+      if (kraftEpochInZk >= kraftControllerEpoch) {
+        throw new ControllerMovedException(s"Cannot register KRaft controller $kraftControllerId with epoch $kraftControllerEpoch " +
+          s"as the current controller register in ZK has the same or newer epoch $kraftEpochInZk.")
+      }
+    }
+
     curEpochOpt match {
       case None =>
         throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
           s"since there is no ZK controller epoch present.")
       case Some((curEpoch: Int, curEpochZk: Int)) =>
-        if (curEpoch >= controllerEpochToStore) {
-          // TODO KAFKA-14436 Need to ensure KRaft has a higher epoch an ZK
-          throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " +
-            s"in ZK since its epoch ${controllerEpochToStore} is not higher than the current ZK epoch ${curEpoch}.")
+        // TODO KAFKA-14436 Increase the KRaft epoch to be higher than the ZK epoch
+        val newControllerEpoch = if (kraftControllerEpoch >= curEpoch) {
+          kraftControllerEpoch
+        } else {
+          curEpoch + 1
         }
 
-        val response = if (controllerOpt.isDefined) {
-          info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active " +
-            s"controller with epoch $controllerEpochToStore. The previous controller was ${controllerOpt.get}.")
-          retryRequestUntilConnected(
-            MultiRequest(Seq(
-              SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
-              DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion),
-              CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp),
-                defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
-          )
-        } else {
-          info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active " +
-            s"controller with epoch $controllerEpochToStore. There was no active controller.")
-          retryRequestUntilConnected(
-            MultiRequest(Seq(
-              SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
-              CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp),
-                defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
-          )
+        val response = controllerOpt match {
+          case Some(controller) =>
+            info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active " +
+              s"controller with ZK epoch $newControllerEpoch. The previous controller was ${controller.broker}.")
+            retryRequestUntilConnected(
+              MultiRequest(Seq(
+                SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), curEpochZk),
+                DeleteOp(ControllerZNode.path, controller.zkVersion),

Review Comment:
   This is the main difference from the old algorithm. Since we are first reading `/controller` to learn the previous KRaft epoch, we use that zkVersion as part of the DeleteOp instead of doing it unconditionally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org